Skip to main content

aft/db/
mod.rs

1use rusqlite::{Connection, TransactionBehavior};
2use std::fmt;
3use std::fs;
4use std::path::Path;
5
6pub mod backups;
7pub mod bash_tasks;
8pub mod compression_events;
9pub mod state;
10
11pub const CURRENT_SCHEMA_VERSION: u32 = 1;
12
13const MIGRATION_V1: &str = r#"
14CREATE TABLE IF NOT EXISTS schema_version (
15  version INTEGER NOT NULL PRIMARY KEY
16);
17
18CREATE TABLE IF NOT EXISTS bash_tasks (
19  harness      TEXT NOT NULL,
20  session_id   TEXT NOT NULL,
21  task_id      TEXT NOT NULL,
22  project_key  TEXT NOT NULL,
23  command      TEXT NOT NULL,
24  cwd          TEXT NOT NULL,
25  status       TEXT NOT NULL,
26  exit_code    INTEGER,
27  pid          INTEGER,
28  pgid         INTEGER,
29  started_at   INTEGER NOT NULL,
30  completed_at INTEGER,
31  stdout_path  TEXT,
32  stderr_path  TEXT,
33  compressed   INTEGER NOT NULL DEFAULT 1,
34  timeout_ms   INTEGER,
35  completion_delivered INTEGER NOT NULL DEFAULT 0,
36  output_bytes INTEGER,
37  metadata     TEXT,
38  PRIMARY KEY (harness, session_id, task_id)
39);
40CREATE INDEX IF NOT EXISTS idx_bash_tasks_project_key ON bash_tasks(project_key);
41CREATE INDEX IF NOT EXISTS idx_bash_tasks_status      ON bash_tasks(status);
42CREATE INDEX IF NOT EXISTS idx_bash_tasks_session_status ON bash_tasks(harness, session_id, status);
43
44CREATE TABLE IF NOT EXISTS compression_events (
45  id                INTEGER PRIMARY KEY AUTOINCREMENT,
46  harness           TEXT NOT NULL,
47  session_id        TEXT,
48  project_key       TEXT NOT NULL,
49  tool              TEXT NOT NULL,
50  task_id           TEXT,
51  command           TEXT,
52  compressor        TEXT NOT NULL,
53  original_bytes    INTEGER NOT NULL,
54  compressed_bytes  INTEGER NOT NULL,
55  original_tokens   INTEGER NOT NULL,
56  compressed_tokens INTEGER NOT NULL,
57  created_at        INTEGER NOT NULL
58);
59CREATE INDEX IF NOT EXISTS idx_compression_session         ON compression_events(harness, session_id);
60CREATE INDEX IF NOT EXISTS idx_compression_session_created ON compression_events(harness, session_id, created_at);
61CREATE INDEX IF NOT EXISTS idx_compression_project_key     ON compression_events(project_key);
62
63CREATE TABLE IF NOT EXISTS backups (
64  id            INTEGER PRIMARY KEY AUTOINCREMENT,
65  backup_id     TEXT,
66  harness       TEXT NOT NULL,
67  session_id    TEXT NOT NULL,
68  project_key   TEXT NOT NULL,
69  op_id         TEXT,
70  order_blob    BLOB NOT NULL,
71  file_path     TEXT NOT NULL,
72  path_hash     TEXT NOT NULL,
73  backup_path   TEXT,
74  kind          TEXT NOT NULL,
75  description   TEXT,
76  created_at    INTEGER NOT NULL,
77  is_tombstone  INTEGER NOT NULL DEFAULT 0
78);
79CREATE INDEX IF NOT EXISTS idx_backups_session_path  ON backups(harness, session_id, path_hash);
80CREATE INDEX IF NOT EXISTS idx_backups_session_op    ON backups(harness, session_id, op_id) WHERE op_id IS NOT NULL;
81CREATE INDEX IF NOT EXISTS idx_backups_session_order ON backups(harness, session_id, order_blob DESC);
82CREATE INDEX IF NOT EXISTS idx_backups_session_path_order ON backups(harness, session_id, path_hash, order_blob DESC);
83
84CREATE TABLE IF NOT EXISTS harness_state (
85  harness    TEXT NOT NULL,
86  key        TEXT NOT NULL,
87  value      TEXT NOT NULL,
88  updated_at INTEGER NOT NULL,
89  PRIMARY KEY (harness, key)
90);
91
92CREATE TABLE IF NOT EXISTS host_state (
93  key        TEXT NOT NULL PRIMARY KEY,
94  value      TEXT NOT NULL,
95  updated_at INTEGER NOT NULL
96);
97"#;
98
99#[derive(Debug)]
100pub enum OpenError {
101    Io(std::io::Error),
102    Sqlite(rusqlite::Error),
103    DowngradeRefused {
104        db_version: u32,
105        supported: u32,
106    },
107    MigrationFailed {
108        from: u32,
109        to: u32,
110        error: rusqlite::Error,
111    },
112}
113
114impl fmt::Display for OpenError {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            OpenError::Io(error) => write!(f, "database I/O error: {error}"),
118            OpenError::Sqlite(error) => write!(f, "sqlite error: {error}"),
119            OpenError::DowngradeRefused {
120                db_version,
121                supported,
122            } => write!(
123                f,
124                "database schema version {db_version} is newer than supported version {supported}"
125            ),
126            OpenError::MigrationFailed { from, to, error } => {
127                write!(f, "database migration {from}->{to} failed: {error}")
128            }
129        }
130    }
131}
132
133impl std::error::Error for OpenError {}
134
135impl From<std::io::Error> for OpenError {
136    fn from(error: std::io::Error) -> Self {
137        OpenError::Io(error)
138    }
139}
140
141impl From<rusqlite::Error> for OpenError {
142    fn from(error: rusqlite::Error) -> Self {
143        OpenError::Sqlite(error)
144    }
145}
146
147/// Open or create the AFT SQLite database at the given path.
148///
149/// Applies per-connection PRAGMAs, runs schema migrations from the DB's
150/// current schema version up to [`CURRENT_SCHEMA_VERSION`], and returns the
151/// configured connection.
152pub fn open(path: &Path) -> Result<Connection, OpenError> {
153    if let Some(parent) = path.parent() {
154        if !parent.as_os_str().is_empty() {
155            fs::create_dir_all(parent)?;
156        }
157    }
158
159    let mut conn = Connection::open(path)?;
160    apply_pragmas(&conn)?;
161    run_migrations(&mut conn)?;
162    Ok(conn)
163}
164
165/// Apply the per-connection PRAGMAs required for every AFT SQLite connection.
166pub fn apply_pragmas(conn: &Connection) -> Result<(), rusqlite::Error> {
167    conn.pragma_update(None, "foreign_keys", "ON")?;
168    conn.pragma_update(None, "journal_mode", "WAL")?;
169    conn.pragma_update(None, "busy_timeout", 5000)?;
170    conn.pragma_update(None, "synchronous", "NORMAL")?;
171    Ok(())
172}
173
174/// Run forward-only migrations up to [`CURRENT_SCHEMA_VERSION`].
175///
176/// Returns the post-migration schema version. Refuses to open databases created
177/// by newer AFT versions.
178pub fn run_migrations(conn: &mut Connection) -> Result<u32, OpenError> {
179    conn.execute_batch(
180        "CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL PRIMARY KEY);",
181    )?;
182
183    let db_version = current_schema_version(conn)?;
184    if db_version > CURRENT_SCHEMA_VERSION {
185        return Err(OpenError::DowngradeRefused {
186            db_version,
187            supported: CURRENT_SCHEMA_VERSION,
188        });
189    }
190
191    for version in (db_version + 1)..=CURRENT_SCHEMA_VERSION {
192        apply_migration(conn, version)?;
193    }
194
195    Ok(current_schema_version(conn)?)
196}
197
198fn current_schema_version(conn: &Connection) -> Result<u32, rusqlite::Error> {
199    conn.query_row(
200        "SELECT COALESCE(MAX(version), 0) FROM schema_version",
201        [],
202        |row| row.get::<_, u32>(0),
203    )
204}
205
206fn apply_migration(conn: &mut Connection, version: u32) -> Result<(), OpenError> {
207    let from = version - 1;
208    let tx = conn
209        .transaction_with_behavior(TransactionBehavior::Immediate)
210        .map_err(|error| OpenError::MigrationFailed {
211            from,
212            to: version,
213            error,
214        })?;
215
216    let result = match version {
217        1 => tx.execute_batch(MIGRATION_V1),
218        _ => Ok(()),
219    }
220    .and_then(|()| {
221        tx.execute("DELETE FROM schema_version", [])?;
222        tx.execute(
223            "INSERT OR REPLACE INTO schema_version (version) VALUES (?1)",
224            [version],
225        )?;
226        tx.commit()
227    });
228
229    result.map_err(|error| OpenError::MigrationFailed {
230        from,
231        to: version,
232        error,
233    })
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use rusqlite::params;
240    use tempfile::tempdir;
241
242    const EXPECTED_TABLES: &[&str] = &[
243        "schema_version",
244        "bash_tasks",
245        "compression_events",
246        "backups",
247        "harness_state",
248        "host_state",
249    ];
250
251    const EXPECTED_INDEXES: &[&str] = &[
252        "idx_bash_tasks_project_key",
253        "idx_bash_tasks_status",
254        "idx_bash_tasks_session_status",
255        "idx_compression_session",
256        "idx_compression_session_created",
257        "idx_compression_project_key",
258        "idx_backups_session_path",
259        "idx_backups_session_op",
260        "idx_backups_session_order",
261        "idx_backups_session_path_order",
262    ];
263
264    #[test]
265    fn open_fresh_db_creates_all_tables() {
266        let dir = tempdir().unwrap();
267        let conn = open(&dir.path().join("aft.db")).unwrap();
268
269        let tables = sqlite_names(&conn, "table");
270        for table in EXPECTED_TABLES {
271            assert!(tables.contains(&table.to_string()), "missing table {table}");
272        }
273    }
274
275    #[test]
276    fn open_fresh_db_creates_all_indexes() {
277        let dir = tempdir().unwrap();
278        let conn = open(&dir.path().join("aft.db")).unwrap();
279
280        let indexes = sqlite_names(&conn, "index");
281        for index in EXPECTED_INDEXES {
282            assert!(
283                indexes.contains(&index.to_string()),
284                "missing index {index}"
285            );
286        }
287    }
288
289    #[test]
290    fn open_existing_db_is_idempotent() {
291        let dir = tempdir().unwrap();
292        let path = dir.path().join("aft.db");
293
294        let conn = open(&path).unwrap();
295        let first_version = schema_version(&conn);
296        drop(conn);
297
298        let conn = open(&path).unwrap();
299        assert_eq!(schema_version(&conn), first_version);
300    }
301
302    #[test]
303    fn pragmas_applied_correctly() {
304        let dir = tempdir().unwrap();
305        let conn = open(&dir.path().join("aft.db")).unwrap();
306
307        let foreign_keys: i64 = conn
308            .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
309            .unwrap();
310        let journal_mode: String = conn
311            .query_row("PRAGMA journal_mode", [], |row| row.get(0))
312            .unwrap();
313        let busy_timeout: i64 = conn
314            .query_row("PRAGMA busy_timeout", [], |row| row.get(0))
315            .unwrap();
316        let synchronous: i64 = conn
317            .query_row("PRAGMA synchronous", [], |row| row.get(0))
318            .unwrap();
319
320        assert_eq!(foreign_keys, 1);
321        assert_eq!(journal_mode, "wal");
322        assert_eq!(busy_timeout, 5000);
323        assert_eq!(synchronous, 1);
324    }
325
326    #[test]
327    fn downgrade_refused() {
328        let dir = tempdir().unwrap();
329        let path = dir.path().join("aft.db");
330        let conn = open(&path).unwrap();
331        conn.execute("INSERT OR REPLACE INTO schema_version VALUES (999)", [])
332            .unwrap();
333        drop(conn);
334
335        match open(&path).unwrap_err() {
336            OpenError::DowngradeRefused {
337                db_version,
338                supported,
339            } => {
340                assert_eq!(db_version, 999);
341                assert_eq!(supported, CURRENT_SCHEMA_VERSION);
342            }
343            error => panic!("expected downgrade refusal, got {error:?}"),
344        }
345    }
346
347    #[test]
348    fn migration_runner_advances_version() {
349        let dir = tempdir().unwrap();
350        let conn = open(&dir.path().join("aft.db")).unwrap();
351
352        assert_eq!(schema_version(&conn), CURRENT_SCHEMA_VERSION);
353    }
354
355    #[test]
356    fn migration_runner_no_op_when_current() {
357        let dir = tempdir().unwrap();
358        let path = dir.path().join("aft.db");
359
360        let conn = open(&path).unwrap();
361        assert_eq!(schema_version_row_count(&conn), 1);
362        drop(conn);
363
364        let conn = open(&path).unwrap();
365        assert_eq!(schema_version(&conn), CURRENT_SCHEMA_VERSION);
366        assert_eq!(schema_version_row_count(&conn), 1);
367    }
368
369    #[test]
370    fn harness_state_compound_pk_works() {
371        let dir = tempdir().unwrap();
372        let conn = open(&dir.path().join("aft.db")).unwrap();
373
374        conn.execute(
375            "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
376            params!["opencode", "warned_tools", "{}", 1_i64],
377        )
378        .unwrap();
379        let duplicate = conn.execute(
380            "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
381            params!["opencode", "warned_tools", "{}", 2_i64],
382        );
383        assert_unique_constraint(duplicate);
384
385        conn.execute(
386            "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
387            params!["pi", "warned_tools", "{}", 3_i64],
388        )
389        .unwrap();
390    }
391
392    #[test]
393    fn host_state_simple_pk_works() {
394        let dir = tempdir().unwrap();
395        let conn = open(&dir.path().join("aft.db")).unwrap();
396
397        conn.execute(
398            "INSERT INTO host_state (key, value, updated_at) VALUES (?1, ?2, ?3)",
399            params!["trusted_filter_projects", "[]", 1_i64],
400        )
401        .unwrap();
402        let duplicate = conn.execute(
403            "INSERT INTO host_state (key, value, updated_at) VALUES (?1, ?2, ?3)",
404            params!["trusted_filter_projects", "[]", 2_i64],
405        );
406        assert_unique_constraint(duplicate);
407    }
408
409    #[test]
410    fn bash_tasks_compound_pk_works() {
411        let dir = tempdir().unwrap();
412        let conn = open(&dir.path().join("aft.db")).unwrap();
413
414        insert_bash_task(&conn, "opencode", "session-1", "bash-12345678").unwrap();
415        let duplicate = insert_bash_task(&conn, "opencode", "session-1", "bash-12345678");
416        assert_unique_constraint(duplicate);
417
418        insert_bash_task(&conn, "pi", "session-1", "bash-12345678").unwrap();
419    }
420
421    #[test]
422    fn backups_order_blob_sort() {
423        let dir = tempdir().unwrap();
424        let conn = open(&dir.path().join("aft.db")).unwrap();
425
426        let one = order_blob(1);
427        let two = order_blob(2);
428        let max = [0xFF; 16];
429
430        insert_backup(&conn, "one", &one).unwrap();
431        insert_backup(&conn, "two", &two).unwrap();
432        insert_backup(&conn, "max", &max).unwrap();
433
434        assert_eq!(backup_ids_ordered(&conn, "ASC"), vec!["one", "two", "max"]);
435        assert_eq!(backup_ids_ordered(&conn, "DESC"), vec!["max", "two", "one"]);
436    }
437
438    fn sqlite_names(conn: &Connection, kind: &str) -> Vec<String> {
439        let sql = match kind {
440            "table" => "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name",
441            "index" => "SELECT name FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' ORDER BY name",
442            _ => panic!("unsupported sqlite_master kind: {kind}"),
443        };
444        let mut stmt = conn.prepare(sql).unwrap();
445        stmt.query_map([], |row| row.get::<_, String>(0))
446            .unwrap()
447            .collect::<Result<Vec<_>, _>>()
448            .unwrap()
449    }
450
451    fn schema_version(conn: &Connection) -> u32 {
452        conn.query_row("SELECT version FROM schema_version", [], |row| row.get(0))
453            .unwrap()
454    }
455
456    fn schema_version_row_count(conn: &Connection) -> i64 {
457        conn.query_row("SELECT COUNT(*) FROM schema_version", [], |row| row.get(0))
458            .unwrap()
459    }
460
461    fn assert_unique_constraint(result: rusqlite::Result<usize>) {
462        let error = result.expect_err("expected a unique constraint violation");
463        assert!(
464            error.to_string().contains("UNIQUE constraint failed"),
465            "expected UNIQUE constraint failure, got {error}"
466        );
467    }
468
469    fn insert_bash_task(
470        conn: &Connection,
471        harness: &str,
472        session_id: &str,
473        task_id: &str,
474    ) -> rusqlite::Result<usize> {
475        conn.execute(
476            "INSERT INTO bash_tasks (
477                harness, session_id, task_id, project_key, command, cwd, status, started_at
478             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
479            params![
480                harness,
481                session_id,
482                task_id,
483                "project-key",
484                "echo ok",
485                "/tmp",
486                "running",
487                1_i64
488            ],
489        )
490    }
491
492    fn insert_backup(
493        conn: &Connection,
494        backup_id: &str,
495        order_blob: &[u8],
496    ) -> rusqlite::Result<usize> {
497        conn.execute(
498            "INSERT INTO backups (
499                backup_id, harness, session_id, project_key, order_blob, file_path,
500                path_hash, kind, created_at
501             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
502            params![
503                backup_id,
504                "opencode",
505                "session-1",
506                "project-key",
507                order_blob,
508                "/tmp/file.txt",
509                "path-hash",
510                "content",
511                1_i64
512            ],
513        )
514    }
515
516    fn order_blob(value: u128) -> [u8; 16] {
517        value.to_be_bytes()
518    }
519
520    fn backup_ids_ordered(conn: &Connection, direction: &str) -> Vec<String> {
521        let sql = match direction {
522            "ASC" => "SELECT backup_id FROM backups ORDER BY order_blob ASC",
523            "DESC" => "SELECT backup_id FROM backups ORDER BY order_blob DESC",
524            _ => panic!("unsupported order direction: {direction}"),
525        };
526        let mut stmt = conn.prepare(sql).unwrap();
527        stmt.query_map([], |row| row.get::<_, String>(0))
528            .unwrap()
529            .collect::<Result<Vec<_>, _>>()
530            .unwrap()
531    }
532}