1use rusqlite::{Connection, TransactionBehavior};
2use std::fmt;
3use std::fs;
4use std::path::Path;
5
6pub mod backups;
7pub mod bash_tasks;
8pub mod compression_events;
9pub mod state;
10
11pub const CURRENT_SCHEMA_VERSION: u32 = 2;
12
13const MIGRATION_V1: &str = r#"
14CREATE TABLE IF NOT EXISTS schema_version (
15 version INTEGER NOT NULL PRIMARY KEY
16);
17
18CREATE TABLE IF NOT EXISTS bash_tasks (
19 harness TEXT NOT NULL,
20 session_id TEXT NOT NULL,
21 task_id TEXT NOT NULL,
22 project_key TEXT NOT NULL,
23 command TEXT NOT NULL,
24 cwd TEXT NOT NULL,
25 status TEXT NOT NULL,
26 exit_code INTEGER,
27 pid INTEGER,
28 pgid INTEGER,
29 started_at INTEGER NOT NULL,
30 completed_at INTEGER,
31 stdout_path TEXT,
32 stderr_path TEXT,
33 compressed INTEGER NOT NULL DEFAULT 1,
34 timeout_ms INTEGER,
35 completion_delivered INTEGER NOT NULL DEFAULT 0,
36 output_bytes INTEGER,
37 metadata TEXT,
38 PRIMARY KEY (harness, session_id, task_id)
39);
40CREATE INDEX IF NOT EXISTS idx_bash_tasks_project_key ON bash_tasks(project_key);
41CREATE INDEX IF NOT EXISTS idx_bash_tasks_status ON bash_tasks(status);
42CREATE INDEX IF NOT EXISTS idx_bash_tasks_session_status ON bash_tasks(harness, session_id, status);
43
44CREATE TABLE IF NOT EXISTS compression_events (
45 id INTEGER PRIMARY KEY AUTOINCREMENT,
46 harness TEXT NOT NULL,
47 session_id TEXT,
48 project_key TEXT NOT NULL,
49 tool TEXT NOT NULL,
50 task_id TEXT,
51 command TEXT,
52 compressor TEXT NOT NULL,
53 original_bytes INTEGER NOT NULL,
54 compressed_bytes INTEGER NOT NULL,
55 original_tokens INTEGER NOT NULL,
56 compressed_tokens INTEGER NOT NULL,
57 created_at INTEGER NOT NULL
58);
59CREATE INDEX IF NOT EXISTS idx_compression_session ON compression_events(harness, session_id);
60CREATE INDEX IF NOT EXISTS idx_compression_session_created ON compression_events(harness, session_id, created_at);
61CREATE INDEX IF NOT EXISTS idx_compression_project_key ON compression_events(project_key);
62
63CREATE TABLE IF NOT EXISTS backups (
64 id INTEGER PRIMARY KEY AUTOINCREMENT,
65 backup_id TEXT,
66 harness TEXT NOT NULL,
67 session_id TEXT NOT NULL,
68 project_key TEXT NOT NULL,
69 op_id TEXT,
70 order_blob BLOB NOT NULL,
71 file_path TEXT NOT NULL,
72 path_hash TEXT NOT NULL,
73 backup_path TEXT,
74 kind TEXT NOT NULL,
75 description TEXT,
76 created_at INTEGER NOT NULL,
77 is_tombstone INTEGER NOT NULL DEFAULT 0
78);
79CREATE INDEX IF NOT EXISTS idx_backups_session_path ON backups(harness, session_id, path_hash);
80CREATE INDEX IF NOT EXISTS idx_backups_session_op ON backups(harness, session_id, op_id) WHERE op_id IS NOT NULL;
81CREATE INDEX IF NOT EXISTS idx_backups_session_order ON backups(harness, session_id, order_blob DESC);
82CREATE INDEX IF NOT EXISTS idx_backups_session_path_order ON backups(harness, session_id, path_hash, order_blob DESC);
83
84CREATE TABLE IF NOT EXISTS harness_state (
85 harness TEXT NOT NULL,
86 key TEXT NOT NULL,
87 value TEXT NOT NULL,
88 updated_at INTEGER NOT NULL,
89 PRIMARY KEY (harness, key)
90);
91
92CREATE TABLE IF NOT EXISTS host_state (
93 key TEXT NOT NULL PRIMARY KEY,
94 value TEXT NOT NULL,
95 updated_at INTEGER NOT NULL
96);
97"#;
98
99const MIGRATION_V2: &str = r#"
100DELETE FROM compression_events
101WHERE id NOT IN (
102 SELECT MIN(id)
103 FROM compression_events
104 GROUP BY
105 harness,
106 COALESCE(session_id, char(0)),
107 project_key,
108 tool,
109 COALESCE(task_id, char(0))
110);
111
112CREATE UNIQUE INDEX IF NOT EXISTS idx_compression_event_identity
113ON compression_events (
114 harness,
115 COALESCE(session_id, char(0)),
116 project_key,
117 tool,
118 COALESCE(task_id, char(0))
119);
120"#;
121
122#[derive(Debug)]
123pub enum OpenError {
124 Io(std::io::Error),
125 Sqlite(rusqlite::Error),
126 DowngradeRefused {
127 db_version: u32,
128 supported: u32,
129 },
130 MigrationFailed {
131 from: u32,
132 to: u32,
133 error: rusqlite::Error,
134 },
135}
136
137impl fmt::Display for OpenError {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 match self {
140 OpenError::Io(error) => write!(f, "database I/O error: {error}"),
141 OpenError::Sqlite(error) => write!(f, "sqlite error: {error}"),
142 OpenError::DowngradeRefused {
143 db_version,
144 supported,
145 } => write!(
146 f,
147 "database schema version {db_version} is newer than supported version {supported}"
148 ),
149 OpenError::MigrationFailed { from, to, error } => {
150 write!(f, "database migration {from}->{to} failed: {error}")
151 }
152 }
153 }
154}
155
156impl std::error::Error for OpenError {}
157
158impl From<std::io::Error> for OpenError {
159 fn from(error: std::io::Error) -> Self {
160 OpenError::Io(error)
161 }
162}
163
164impl From<rusqlite::Error> for OpenError {
165 fn from(error: rusqlite::Error) -> Self {
166 OpenError::Sqlite(error)
167 }
168}
169
170pub fn open(path: &Path) -> Result<Connection, OpenError> {
176 if let Some(parent) = path.parent() {
177 if !parent.as_os_str().is_empty() {
178 fs::create_dir_all(parent)?;
179 }
180 }
181
182 let mut conn = Connection::open(path)?;
183 apply_pragmas(&conn)?;
184 run_migrations(&mut conn)?;
185 Ok(conn)
186}
187
188pub fn apply_pragmas(conn: &Connection) -> Result<(), rusqlite::Error> {
190 conn.pragma_update(None, "foreign_keys", "ON")?;
191 conn.pragma_update(None, "journal_mode", "WAL")?;
192 conn.pragma_update(None, "busy_timeout", 5000)?;
193 conn.pragma_update(None, "synchronous", "NORMAL")?;
194 Ok(())
195}
196
197pub fn run_migrations(conn: &mut Connection) -> Result<u32, OpenError> {
202 conn.execute_batch(
203 "CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL PRIMARY KEY);",
204 )?;
205
206 let db_version = current_schema_version(conn)?;
207 if db_version > CURRENT_SCHEMA_VERSION {
208 return Err(OpenError::DowngradeRefused {
209 db_version,
210 supported: CURRENT_SCHEMA_VERSION,
211 });
212 }
213
214 for version in (db_version + 1)..=CURRENT_SCHEMA_VERSION {
215 apply_migration(conn, version)?;
216 }
217
218 Ok(current_schema_version(conn)?)
219}
220
221fn current_schema_version(conn: &Connection) -> Result<u32, rusqlite::Error> {
222 conn.query_row(
223 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
224 [],
225 |row| row.get::<_, u32>(0),
226 )
227}
228
229fn apply_migration(conn: &mut Connection, version: u32) -> Result<(), OpenError> {
230 let from = version - 1;
231 let tx = conn
232 .transaction_with_behavior(TransactionBehavior::Immediate)
233 .map_err(|error| OpenError::MigrationFailed {
234 from,
235 to: version,
236 error,
237 })?;
238
239 let result = match version {
240 1 => tx.execute_batch(MIGRATION_V1),
241 2 => tx.execute_batch(MIGRATION_V2),
242 _ => Ok(()),
243 }
244 .and_then(|()| {
245 tx.execute("DELETE FROM schema_version", [])?;
246 tx.execute(
247 "INSERT OR REPLACE INTO schema_version (version) VALUES (?1)",
248 [version],
249 )?;
250 tx.commit()
251 });
252
253 result.map_err(|error| OpenError::MigrationFailed {
254 from,
255 to: version,
256 error,
257 })
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use rusqlite::params;
264 use tempfile::tempdir;
265
266 const EXPECTED_TABLES: &[&str] = &[
267 "schema_version",
268 "bash_tasks",
269 "compression_events",
270 "backups",
271 "harness_state",
272 "host_state",
273 ];
274
275 const EXPECTED_INDEXES: &[&str] = &[
276 "idx_bash_tasks_project_key",
277 "idx_bash_tasks_status",
278 "idx_bash_tasks_session_status",
279 "idx_compression_session",
280 "idx_compression_session_created",
281 "idx_compression_project_key",
282 "idx_compression_event_identity",
283 "idx_backups_session_path",
284 "idx_backups_session_op",
285 "idx_backups_session_order",
286 "idx_backups_session_path_order",
287 ];
288
289 #[test]
290 fn open_fresh_db_creates_all_tables() {
291 let dir = tempdir().unwrap();
292 let conn = open(&dir.path().join("aft.db")).unwrap();
293
294 let tables = sqlite_names(&conn, "table");
295 for table in EXPECTED_TABLES {
296 assert!(tables.contains(&table.to_string()), "missing table {table}");
297 }
298 }
299
300 #[test]
301 fn open_fresh_db_creates_all_indexes() {
302 let dir = tempdir().unwrap();
303 let conn = open(&dir.path().join("aft.db")).unwrap();
304
305 let indexes = sqlite_names(&conn, "index");
306 for index in EXPECTED_INDEXES {
307 assert!(
308 indexes.contains(&index.to_string()),
309 "missing index {index}"
310 );
311 }
312 }
313
314 #[test]
315 fn open_existing_db_is_idempotent() {
316 let dir = tempdir().unwrap();
317 let path = dir.path().join("aft.db");
318
319 let conn = open(&path).unwrap();
320 let first_version = schema_version(&conn);
321 drop(conn);
322
323 let conn = open(&path).unwrap();
324 assert_eq!(schema_version(&conn), first_version);
325 }
326
327 #[test]
328 fn pragmas_applied_correctly() {
329 let dir = tempdir().unwrap();
330 let conn = open(&dir.path().join("aft.db")).unwrap();
331
332 let foreign_keys: i64 = conn
333 .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
334 .unwrap();
335 let journal_mode: String = conn
336 .query_row("PRAGMA journal_mode", [], |row| row.get(0))
337 .unwrap();
338 let busy_timeout: i64 = conn
339 .query_row("PRAGMA busy_timeout", [], |row| row.get(0))
340 .unwrap();
341 let synchronous: i64 = conn
342 .query_row("PRAGMA synchronous", [], |row| row.get(0))
343 .unwrap();
344
345 assert_eq!(foreign_keys, 1);
346 assert_eq!(journal_mode, "wal");
347 assert_eq!(busy_timeout, 5000);
348 assert_eq!(synchronous, 1);
349 }
350
351 #[test]
352 fn downgrade_refused() {
353 let dir = tempdir().unwrap();
354 let path = dir.path().join("aft.db");
355 let conn = open(&path).unwrap();
356 conn.execute("INSERT OR REPLACE INTO schema_version VALUES (999)", [])
357 .unwrap();
358 drop(conn);
359
360 match open(&path).unwrap_err() {
361 OpenError::DowngradeRefused {
362 db_version,
363 supported,
364 } => {
365 assert_eq!(db_version, 999);
366 assert_eq!(supported, CURRENT_SCHEMA_VERSION);
367 }
368 error => panic!("expected downgrade refusal, got {error:?}"),
369 }
370 }
371
372 #[test]
373 fn migration_runner_advances_version() {
374 let dir = tempdir().unwrap();
375 let conn = open(&dir.path().join("aft.db")).unwrap();
376
377 assert_eq!(schema_version(&conn), CURRENT_SCHEMA_VERSION);
378 }
379
380 #[test]
381 fn migration_v2_deduplicates_compression_events_and_adds_unique_index() {
382 let dir = tempdir().unwrap();
383 let path = dir.path().join("aft.db");
384
385 let conn = Connection::open(&path).unwrap();
386 conn.execute_batch(MIGRATION_V1).unwrap();
387 conn.execute("DELETE FROM schema_version", []).unwrap();
388 conn.execute("INSERT INTO schema_version (version) VALUES (1)", [])
389 .unwrap();
390 insert_compression_event(
391 &conn,
392 1,
393 "opencode",
394 Some("session-1"),
395 "project-key",
396 "bash",
397 Some("task-1"),
398 )
399 .unwrap();
400 insert_compression_event(
401 &conn,
402 2,
403 "opencode",
404 Some("session-1"),
405 "project-key",
406 "bash",
407 Some("task-1"),
408 )
409 .unwrap();
410 insert_compression_event(&conn, 3, "opencode", None, "project-key", "bash", None).unwrap();
411 drop(conn);
412
413 let conn = open(&path).unwrap();
414
415 assert_eq!(schema_version(&conn), CURRENT_SCHEMA_VERSION);
416 let ids = compression_event_ids(&conn);
417 assert_eq!(ids, vec![1, 3]);
418 let indexes = sqlite_names(&conn, "index");
419 assert!(
420 indexes.contains(&"idx_compression_event_identity".to_string()),
421 "missing v2 unique compression event identity index"
422 );
423 assert_unique_constraint(insert_compression_event(
424 &conn,
425 4,
426 "opencode",
427 Some("session-1"),
428 "project-key",
429 "bash",
430 Some("task-1"),
431 ));
432 }
433
434 #[test]
435 fn migration_runner_no_op_when_current() {
436 let dir = tempdir().unwrap();
437 let path = dir.path().join("aft.db");
438
439 let conn = open(&path).unwrap();
440 assert_eq!(schema_version_row_count(&conn), 1);
441 drop(conn);
442
443 let conn = open(&path).unwrap();
444 assert_eq!(schema_version(&conn), CURRENT_SCHEMA_VERSION);
445 assert_eq!(schema_version_row_count(&conn), 1);
446 }
447
448 #[test]
449 fn harness_state_compound_pk_works() {
450 let dir = tempdir().unwrap();
451 let conn = open(&dir.path().join("aft.db")).unwrap();
452
453 conn.execute(
454 "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
455 params!["opencode", "warned_tools", "{}", 1_i64],
456 )
457 .unwrap();
458 let duplicate = conn.execute(
459 "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
460 params!["opencode", "warned_tools", "{}", 2_i64],
461 );
462 assert_unique_constraint(duplicate);
463
464 conn.execute(
465 "INSERT INTO harness_state (harness, key, value, updated_at) VALUES (?1, ?2, ?3, ?4)",
466 params!["pi", "warned_tools", "{}", 3_i64],
467 )
468 .unwrap();
469 }
470
471 #[test]
472 fn host_state_simple_pk_works() {
473 let dir = tempdir().unwrap();
474 let conn = open(&dir.path().join("aft.db")).unwrap();
475
476 conn.execute(
477 "INSERT INTO host_state (key, value, updated_at) VALUES (?1, ?2, ?3)",
478 params!["trusted_filter_projects", "[]", 1_i64],
479 )
480 .unwrap();
481 let duplicate = conn.execute(
482 "INSERT INTO host_state (key, value, updated_at) VALUES (?1, ?2, ?3)",
483 params!["trusted_filter_projects", "[]", 2_i64],
484 );
485 assert_unique_constraint(duplicate);
486 }
487
488 #[test]
489 fn bash_tasks_compound_pk_works() {
490 let dir = tempdir().unwrap();
491 let conn = open(&dir.path().join("aft.db")).unwrap();
492
493 insert_bash_task(&conn, "opencode", "session-1", "bash-12345678").unwrap();
494 let duplicate = insert_bash_task(&conn, "opencode", "session-1", "bash-12345678");
495 assert_unique_constraint(duplicate);
496
497 insert_bash_task(&conn, "pi", "session-1", "bash-12345678").unwrap();
498 }
499
500 #[test]
501 fn backups_order_blob_sort() {
502 let dir = tempdir().unwrap();
503 let conn = open(&dir.path().join("aft.db")).unwrap();
504
505 let one = order_blob(1);
506 let two = order_blob(2);
507 let max = [0xFF; 16];
508
509 insert_backup(&conn, "one", &one).unwrap();
510 insert_backup(&conn, "two", &two).unwrap();
511 insert_backup(&conn, "max", &max).unwrap();
512
513 assert_eq!(backup_ids_ordered(&conn, "ASC"), vec!["one", "two", "max"]);
514 assert_eq!(backup_ids_ordered(&conn, "DESC"), vec!["max", "two", "one"]);
515 }
516
517 fn sqlite_names(conn: &Connection, kind: &str) -> Vec<String> {
518 let sql = match kind {
519 "table" => "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name",
520 "index" => "SELECT name FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' ORDER BY name",
521 _ => panic!("unsupported sqlite_master kind: {kind}"),
522 };
523 let mut stmt = conn.prepare(sql).unwrap();
524 stmt.query_map([], |row| row.get::<_, String>(0))
525 .unwrap()
526 .collect::<Result<Vec<_>, _>>()
527 .unwrap()
528 }
529
530 fn schema_version(conn: &Connection) -> u32 {
531 conn.query_row("SELECT version FROM schema_version", [], |row| row.get(0))
532 .unwrap()
533 }
534
535 fn schema_version_row_count(conn: &Connection) -> i64 {
536 conn.query_row("SELECT COUNT(*) FROM schema_version", [], |row| row.get(0))
537 .unwrap()
538 }
539
540 fn assert_unique_constraint(result: rusqlite::Result<usize>) {
541 let error = result.expect_err("expected a unique constraint violation");
542 assert!(
543 error.to_string().contains("UNIQUE constraint failed"),
544 "expected UNIQUE constraint failure, got {error}"
545 );
546 }
547
548 fn insert_bash_task(
549 conn: &Connection,
550 harness: &str,
551 session_id: &str,
552 task_id: &str,
553 ) -> rusqlite::Result<usize> {
554 conn.execute(
555 "INSERT INTO bash_tasks (
556 harness, session_id, task_id, project_key, command, cwd, status, started_at
557 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
558 params![
559 harness,
560 session_id,
561 task_id,
562 "project-key",
563 "echo ok",
564 "/tmp",
565 "running",
566 1_i64
567 ],
568 )
569 }
570
571 fn insert_compression_event(
572 conn: &Connection,
573 id: i64,
574 harness: &str,
575 session_id: Option<&str>,
576 project_key: &str,
577 tool: &str,
578 task_id: Option<&str>,
579 ) -> rusqlite::Result<usize> {
580 conn.execute(
581 "INSERT INTO compression_events (
582 id, harness, session_id, project_key, tool, task_id, command, compressor,
583 original_bytes, compressed_bytes, original_tokens, compressed_tokens, created_at
584 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
585 params![
586 id,
587 harness,
588 session_id,
589 project_key,
590 tool,
591 task_id,
592 "echo ok",
593 "test-compressor",
594 100_i64,
595 50_i64,
596 20_i64,
597 10_i64,
598 id
599 ],
600 )
601 }
602
603 fn compression_event_ids(conn: &Connection) -> Vec<i64> {
604 let mut stmt = conn
605 .prepare("SELECT id FROM compression_events ORDER BY id")
606 .unwrap();
607 stmt.query_map([], |row| row.get::<_, i64>(0))
608 .unwrap()
609 .collect::<Result<Vec<_>, _>>()
610 .unwrap()
611 }
612
613 fn insert_backup(
614 conn: &Connection,
615 backup_id: &str,
616 order_blob: &[u8],
617 ) -> rusqlite::Result<usize> {
618 conn.execute(
619 "INSERT INTO backups (
620 backup_id, harness, session_id, project_key, order_blob, file_path,
621 path_hash, kind, created_at
622 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
623 params![
624 backup_id,
625 "opencode",
626 "session-1",
627 "project-key",
628 order_blob,
629 "/tmp/file.txt",
630 "path-hash",
631 "content",
632 1_i64
633 ],
634 )
635 }
636
637 fn order_blob(value: u128) -> [u8; 16] {
638 value.to_be_bytes()
639 }
640
641 fn backup_ids_ordered(conn: &Connection, direction: &str) -> Vec<String> {
642 let sql = match direction {
643 "ASC" => "SELECT backup_id FROM backups ORDER BY order_blob ASC",
644 "DESC" => "SELECT backup_id FROM backups ORDER BY order_blob DESC",
645 _ => panic!("unsupported order direction: {direction}"),
646 };
647 let mut stmt = conn.prepare(sql).unwrap();
648 stmt.query_map([], |row| row.get::<_, String>(0))
649 .unwrap()
650 .collect::<Result<Vec<_>, _>>()
651 .unwrap()
652 }
653}