1use rusqlite::{Connection, TransactionBehavior};
2use std::fmt;
3use std::fs;
4use std::path::Path;
5
6pub mod backups;
7pub mod bash_tasks;
8pub mod compression_events;
9pub mod state;
10
11pub const CURRENT_SCHEMA_VERSION: u32 = 1;
12
13const MIGRATION_V1: &str = r#"
14CREATE TABLE IF NOT EXISTS schema_version (
15 version INTEGER NOT NULL PRIMARY KEY
16);
17
18CREATE TABLE IF NOT EXISTS bash_tasks (
19 harness TEXT NOT NULL,
20 session_id TEXT NOT NULL,
21 task_id TEXT NOT NULL,
22 project_key TEXT NOT NULL,
23 command TEXT NOT NULL,
24 cwd TEXT NOT NULL,
25 status TEXT NOT NULL,
26 exit_code INTEGER,
27 pid INTEGER,
28 pgid INTEGER,
29 started_at INTEGER NOT NULL,
30 completed_at INTEGER,
31 stdout_path TEXT,
32 stderr_path TEXT,
33 compressed INTEGER NOT NULL DEFAULT 1,
34 timeout_ms INTEGER,
35 completion_delivered INTEGER NOT NULL DEFAULT 0,
36 output_bytes INTEGER,
37 metadata TEXT,
38 PRIMARY KEY (harness, session_id, task_id)
39);
40CREATE INDEX IF NOT EXISTS idx_bash_tasks_project_key ON bash_tasks(project_key);
41CREATE INDEX IF NOT EXISTS idx_bash_tasks_status ON bash_tasks(status);
42CREATE INDEX IF NOT EXISTS idx_bash_tasks_session_status ON bash_tasks(harness, session_id, status);
43
44CREATE TABLE IF NOT EXISTS compression_events (
45 id INTEGER PRIMARY KEY AUTOINCREMENT,
46 harness TEXT NOT NULL,
47 session_id TEXT,
48 project_key TEXT NOT NULL,
49 tool TEXT NOT NULL,
50 task_id TEXT,
51 command TEXT,
52 compressor TEXT NOT NULL,
53 original_bytes INTEGER NOT NULL,
54 compressed_bytes INTEGER NOT NULL,
55 original_tokens INTEGER NOT NULL,
56 compressed_tokens INTEGER NOT NULL,
57 created_at INTEGER NOT NULL
58);
59CREATE INDEX IF NOT EXISTS idx_compression_session ON compression_events(harness, session_id);
60CREATE INDEX IF NOT EXISTS idx_compression_session_created ON compression_events(harness, session_id, created_at);
61CREATE INDEX IF NOT EXISTS idx_compression_project_key ON compression_events(project_key);
62
63CREATE TABLE IF NOT EXISTS backups (
64 id INTEGER PRIMARY KEY AUTOINCREMENT,
65 backup_id TEXT,
66 harness TEXT NOT NULL,
67 session_id TEXT NOT NULL,
68 project_key TEXT NOT NULL,
69 op_id TEXT,
70 order_blob BLOB NOT NULL,
71 file_path TEXT NOT NULL,
72 path_hash TEXT NOT NULL,
73 backup_path TEXT,
74 kind TEXT NOT NULL,
75 description TEXT,
76 created_at INTEGER NOT NULL,
77 is_tombstone INTEGER NOT NULL DEFAULT 0
78);
79CREATE INDEX IF NOT EXISTS idx_backups_session_path ON backups(harness, session_id, path_hash);
80CREATE INDEX IF NOT EXISTS idx_backups_session_op ON backups(harness, session_id, op_id) WHERE op_id IS NOT NULL;
81CREATE INDEX IF NOT EXISTS idx_backups_session_order ON backups(harness, session_id, order_blob DESC);
82CREATE INDEX IF NOT EXISTS idx_backups_session_path_order ON backups(harness, session_id, path_hash, order_blob DESC);
83
84CREATE TABLE IF NOT EXISTS harness_state (
85 harness TEXT NOT NULL,
86 key TEXT NOT NULL,
87 value TEXT NOT NULL,
88 updated_at INTEGER NOT NULL,
89 PRIMARY KEY (harness, key)
90);
91
92CREATE TABLE IF NOT EXISTS host_state (
93 key TEXT NOT NULL PRIMARY KEY,
94 value TEXT NOT NULL,
95 updated_at INTEGER NOT NULL
96);
97"#;
98
99#[derive(Debug)]
100pub enum OpenError {
101 Io(std::io::Error),
102 Sqlite(rusqlite::Error),
103 DowngradeRefused {
104 db_version: u32,
105 supported: u32,
106 },
107 MigrationFailed {
108 from: u32,
109 to: u32,
110 error: rusqlite::Error,
111 },
112}
113
114impl fmt::Display for OpenError {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 OpenError::Io(error) => write!(f, "database I/O error: {error}"),
118 OpenError::Sqlite(error) => write!(f, "sqlite error: {error}"),
119 OpenError::DowngradeRefused {
120 db_version,
121 supported,
122 } => write!(
123 f,
124 "database schema version {db_version} is newer than supported version {supported}"
125 ),
126 OpenError::MigrationFailed { from, to, error } => {
127 write!(f, "database migration {from}->{to} failed: {error}")
128 }
129 }
130 }
131}
132
133impl std::error::Error for OpenError {}
134
135impl From<std::io::Error> for OpenError {
136 fn from(error: std::io::Error) -> Self {
137 OpenError::Io(error)
138 }
139}
140
141impl From<rusqlite::Error> for OpenError {
142 fn from(error: rusqlite::Error) -> Self {
143 OpenError::Sqlite(error)
144 }
145}
146
147pub fn open(path: &Path) -> Result<Connection, OpenError> {
153 if let Some(parent) = path.parent() {
154 if !parent.as_os_str().is_empty() {
155 fs::create_dir_all(parent)?;
156 }
157 }
158
159 let mut conn = Connection::open(path)?;
160 apply_pragmas(&conn)?;
161 run_migrations(&mut conn)?;
162 Ok(conn)
163}
164
165pub fn apply_pragmas(conn: &Connection) -> Result<(), rusqlite::Error> {
167 conn.pragma_update(None, "foreign_keys", "ON")?;
168 conn.pragma_update(None, "journal_mode", "WAL")?;
169 conn.pragma_update(None, "busy_timeout", 5000)?;
170 conn.pragma_update(None, "synchronous", "NORMAL")?;
171 Ok(())
172}
173
174pub fn run_migrations(conn: &mut Connection) -> Result<u32, OpenError> {
179 conn.execute_batch(
180 "CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL PRIMARY KEY);",
181 )?;
182
183 let db_version = current_schema_version(conn)?;
184 if db_version > CURRENT_SCHEMA_VERSION {
185 return Err(OpenError::DowngradeRefused {
186 db_version,
187 supported: CURRENT_SCHEMA_VERSION,
188 });
189 }
190
191 for version in (db_version + 1)..=CURRENT_SCHEMA_VERSION {
192 apply_migration(conn, version)?;
193 }
194
195 Ok(current_schema_version(conn)?)
196}
197
198fn current_schema_version(conn: &Connection) -> Result<u32, rusqlite::Error> {
199 conn.query_row(
200 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
201 [],
202 |row| row.get::<_, u32>(0),
203 )
204}
205
206fn apply_migration(conn: &mut Connection, version: u32) -> Result<(), OpenError> {
207 let from = version - 1;
208 let tx = conn
209 .transaction_with_behavior(TransactionBehavior::Immediate)
210 .map_err(|error| OpenError::MigrationFailed {
211 from,
212 to: version,
213 error,
214 })?;
215
216 let result = match version {
217 1 => tx.execute_batch(MIGRATION_V1),
218 _ => Ok(()),
219 }
220 .and_then(|()| {
221 tx.execute("DELETE FROM schema_version", [])?;
222 tx.execute(
223 "INSERT OR REPLACE INTO schema_version (version) VALUES (?1)",
224 [version],
225 )?;
226 tx.commit()
227 });
228
229 result.map_err(|error| OpenError::MigrationFailed {
230 from,
231 to: version,
232 error,
233 })
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239 use rusqlite::params;
240 use tempfile::tempdir;
241
242 const EXPECTED_TABLES: &[&str] = &[
243 "schema_version",
244 "bash_tasks",
245 "compression_events",
246 "backups",
247 "harness_state",
248 "host_state",
249 ];
250
251 const EXPECTED_INDEXES: &[&str] = &[
252 "idx_bash_tasks_project_key",
253 "idx_bash_tasks_status",
254 "idx_bash_tasks_session_status",
255 "idx_compression_session",
256 "idx_compression_session_created",
257 "idx_compression_project_key",
258 "idx_backups_session_path",
259 "idx_backups_session_op",
260 "idx_backups_session_order",
261 "idx_backups_session_path_order",
262 ];
263
264 #[test]
265 fn open_fresh_db_creates_all_tables() {
266 let dir = tempdir().unwrap();
267 let conn = open(&dir.path().join("aft.db")).unwrap();
268
269 let tables = sqlite_names(&conn, "table");
270 for table in EXPECTED_TABLES {
271 assert!(tables.contains(&table.to_string()), "missing table {table}");
272 }
273 }
274
275 #[test]
276 fn open_fresh_db_creates_all_indexes() {
277 let dir = tempdir().unwrap();
278 let conn = open(&dir.path().join("aft.db")).unwrap();
279
280 let indexes = sqlite_names(&conn, "index");
281 for index in EXPECTED_INDEXES {
282 assert!(
283 indexes.contains(&index.to_string()),
284 "missing index {index}"
285 );
286 }
287 }
288
289 #[test]
290 fn open_existing_db_is_idempotent() {
291 let dir = tempdir().unwrap();
292 let path = dir.path().join("aft.db");
293
294 let conn = open(&path).unwrap();
295 let first_version = schema_version(&conn);
296 drop(conn);
297
298 let conn = open(&path).unwrap();
299 assert_eq!(schema_version(&conn), first_version);
300 }
301
302 #[test]
303 fn pragmas_applied_correctly() {
304 let dir = tempdir().unwrap();
305 let conn = open(&dir.path().join("aft.db")).unwrap();
306
307 let foreign_keys: i64 = conn
308 .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
309 .unwrap();
310 let journal_mode: String = conn
311 .query_row("PRAGMA journal_mode", [], |row| row.get(0))
312 .unwrap();
313 let busy_timeout: i64 = conn
314 .query_row("PRAGMA busy_timeout", [], |row| row.get(0))
315 .unwrap();
316 let synchronous: i64 = conn
317 .query_row("PRAGMA synchronous", [], |row| row.get(0))
318 .unwrap();
319
320 assert_eq!(foreign_keys, 1);
321 assert_eq!(journal_mode, "wal");
322 assert_eq!(busy_timeout, 5000);
323 assert_eq!(synchronous, 1);
324 }
325
326 #[test]
327 fn downgrade_refused() {
328 let dir = tempdir().unwrap();
329 let path = dir.path().join("aft.db");
330 let conn = open(&path).unwrap();
331 conn.execute("INSERT OR REPLACE INTO schema_version VALUES (999)", [])
332 .unwrap();
333 drop(conn);
334
335 match open(&path).unwrap_err() {
336 OpenError::DowngradeRefused {
337 db_version,
338 supported,
339 } => {
340 assert_eq!(db_version, 999);
341 assert_eq!(supported, CURRENT_SCHEMA_VERSION);
342 }
343 error => panic!("expected downgrade refusal, got {error:?}"),
344 }
345 }
346
347 #[test]
348 fn migration_runner_advances_version() {
349 let dir = tempdir().unwrap();
350 let conn = open(&dir.path().join("aft.db")).unwrap();
351
352 assert_eq!(schema_version(&conn), CURRENT_SCHEMA_VERSION);
353 }
354
355 #[test]
356 fn migration_runner_no_op_when_current() {
357 let dir = tempdir().unwrap();
358 let path = dir.path().join("aft.db");
359
360 let conn = open(&path).unwrap();
361 assert_eq!(schema_version_row_count(&conn), 1);
362 drop(conn);
363
364 let conn = open(&path).unwrap();
365 assert_eq!(schema_version(&conn), CURRENT_SCHEMA_VERSION);
366 assert_eq!(schema_version_row_count(&conn), 1);
367 }
368
369 #[test]
370 fn harness_state_compound_pk_works() {
371 let dir = tempdir().unwrap();
372 let conn = open(&dir.path().join("aft.db")).unwrap();
373
374 conn.execute(
375 "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
376 params!["opencode", "warned_tools", "{}", 1_i64],
377 )
378 .unwrap();
379 let duplicate = conn.execute(
380 "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
381 params!["opencode", "warned_tools", "{}", 2_i64],
382 );
383 assert_unique_constraint(duplicate);
384
385 conn.execute(
386 "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
387 params!["pi", "warned_tools", "{}", 3_i64],
388 )
389 .unwrap();
390 }
391
392 #[test]
393 fn host_state_simple_pk_works() {
394 let dir = tempdir().unwrap();
395 let conn = open(&dir.path().join("aft.db")).unwrap();
396
397 conn.execute(
398 "INSERT INTO host_state (key, value, updated_at) VALUES (?1, ?2, ?3)",
399 params!["trusted_filter_projects", "[]", 1_i64],
400 )
401 .unwrap();
402 let duplicate = conn.execute(
403 "INSERT INTO host_state (key, value, updated_at) VALUES (?1, ?2, ?3)",
404 params!["trusted_filter_projects", "[]", 2_i64],
405 );
406 assert_unique_constraint(duplicate);
407 }
408
409 #[test]
410 fn bash_tasks_compound_pk_works() {
411 let dir = tempdir().unwrap();
412 let conn = open(&dir.path().join("aft.db")).unwrap();
413
414 insert_bash_task(&conn, "opencode", "session-1", "bash-12345678").unwrap();
415 let duplicate = insert_bash_task(&conn, "opencode", "session-1", "bash-12345678");
416 assert_unique_constraint(duplicate);
417
418 insert_bash_task(&conn, "pi", "session-1", "bash-12345678").unwrap();
419 }
420
421 #[test]
422 fn backups_order_blob_sort() {
423 let dir = tempdir().unwrap();
424 let conn = open(&dir.path().join("aft.db")).unwrap();
425
426 let one = order_blob(1);
427 let two = order_blob(2);
428 let max = [0xFF; 16];
429
430 insert_backup(&conn, "one", &one).unwrap();
431 insert_backup(&conn, "two", &two).unwrap();
432 insert_backup(&conn, "max", &max).unwrap();
433
434 assert_eq!(backup_ids_ordered(&conn, "ASC"), vec!["one", "two", "max"]);
435 assert_eq!(backup_ids_ordered(&conn, "DESC"), vec!["max", "two", "one"]);
436 }
437
438 fn sqlite_names(conn: &Connection, kind: &str) -> Vec<String> {
439 let sql = match kind {
440 "table" => "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name",
441 "index" => "SELECT name FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' ORDER BY name",
442 _ => panic!("unsupported sqlite_master kind: {kind}"),
443 };
444 let mut stmt = conn.prepare(sql).unwrap();
445 stmt.query_map([], |row| row.get::<_, String>(0))
446 .unwrap()
447 .collect::<Result<Vec<_>, _>>()
448 .unwrap()
449 }
450
451 fn schema_version(conn: &Connection) -> u32 {
452 conn.query_row("SELECT version FROM schema_version", [], |row| row.get(0))
453 .unwrap()
454 }
455
456 fn schema_version_row_count(conn: &Connection) -> i64 {
457 conn.query_row("SELECT COUNT(*) FROM schema_version", [], |row| row.get(0))
458 .unwrap()
459 }
460
461 fn assert_unique_constraint(result: rusqlite::Result<usize>) {
462 let error = result.expect_err("expected a unique constraint violation");
463 assert!(
464 error.to_string().contains("UNIQUE constraint failed"),
465 "expected UNIQUE constraint failure, got {error}"
466 );
467 }
468
469 fn insert_bash_task(
470 conn: &Connection,
471 harness: &str,
472 session_id: &str,
473 task_id: &str,
474 ) -> rusqlite::Result<usize> {
475 conn.execute(
476 "INSERT INTO bash_tasks (
477 harness, session_id, task_id, project_key, command, cwd, status, started_at
478 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
479 params![
480 harness,
481 session_id,
482 task_id,
483 "project-key",
484 "echo ok",
485 "/tmp",
486 "running",
487 1_i64
488 ],
489 )
490 }
491
492 fn insert_backup(
493 conn: &Connection,
494 backup_id: &str,
495 order_blob: &[u8],
496 ) -> rusqlite::Result<usize> {
497 conn.execute(
498 "INSERT INTO backups (
499 backup_id, harness, session_id, project_key, order_blob, file_path,
500 path_hash, kind, created_at
501 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
502 params![
503 backup_id,
504 "opencode",
505 "session-1",
506 "project-key",
507 order_blob,
508 "/tmp/file.txt",
509 "path-hash",
510 "content",
511 1_i64
512 ],
513 )
514 }
515
516 fn order_blob(value: u128) -> [u8; 16] {
517 value.to_be_bytes()
518 }
519
520 fn backup_ids_ordered(conn: &Connection, direction: &str) -> Vec<String> {
521 let sql = match direction {
522 "ASC" => "SELECT backup_id FROM backups ORDER BY order_blob ASC",
523 "DESC" => "SELECT backup_id FROM backups ORDER BY order_blob DESC",
524 _ => panic!("unsupported order direction: {direction}"),
525 };
526 let mut stmt = conn.prepare(sql).unwrap();
527 stmt.query_map([], |row| row.get::<_, String>(0))
528 .unwrap()
529 .collect::<Result<Vec<_>, _>>()
530 .unwrap()
531 }
532}