orbok_workers/
recovery.rs1use orbok_core::{OrbokResult, now_iso8601};
9use orbok_db::Catalog;
10use std::path::Path;
11
12#[derive(Debug, Default)]
14pub struct RecoveryReport {
15 pub jobs_reset: u64,
17 pub jobs_pending: u64,
19 pub cache_recreated: bool,
21 pub cache_rebuilt: bool,
23}
24
25pub fn run_startup_recovery(
29 catalog: &Catalog,
30 cache_db_path: &Path,
31) -> OrbokResult<RecoveryReport> {
32 let mut report = RecoveryReport::default();
33 report.jobs_reset = reset_interrupted_jobs(catalog)?;
34 report.jobs_pending = count_pending_jobs(catalog)?;
35 let cache_status = ensure_cache_db(cache_db_path)?;
36 report.cache_recreated = cache_status == CacheDbStatus::Recreated;
37 report.cache_rebuilt = cache_status == CacheDbStatus::Rebuilt;
38 if report.jobs_reset > 0 {
39 tracing::warn!(reset = report.jobs_reset, "reset interrupted jobs to queued on startup");
40 }
41 Ok(report)
42}
43
44fn reset_interrupted_jobs(catalog: &Catalog) -> OrbokResult<u64> {
47 let conn = catalog.lock();
48 let n = conn
49 .execute(
50 "UPDATE index_jobs SET status = 'queued', updated_at = ?1 WHERE status = 'running'",
51 rusqlite::params![now_iso8601()],
52 )
53 .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
54 Ok(n as u64)
55}
56
57fn count_pending_jobs(catalog: &Catalog) -> OrbokResult<u64> {
58 let conn = catalog.lock();
59 let n: i64 = conn
60 .query_row("SELECT COUNT(*) FROM index_jobs WHERE status = 'queued'", [], |r| r.get(0))
61 .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
62 Ok(n as u64)
63}
64
65#[derive(PartialEq)]
66enum CacheDbStatus {
67 Ok,
68 Recreated,
69 Rebuilt,
70}
71
72fn ensure_cache_db(path: &Path) -> OrbokResult<CacheDbStatus> {
75 if !path.exists() {
76 return Ok(CacheDbStatus::Recreated);
78 }
79 match rusqlite::Connection::open(path) {
81 Ok(conn) => {
82 let result: String = conn
83 .query_row("PRAGMA integrity_check", [], |r| r.get(0))
84 .unwrap_or_else(|_| "error".to_string());
85 if result != "ok" {
86 tracing::error!(path = %path.display(), "cache DB corrupt — backing up and removing");
87 let backup = path.with_extension("sqlite3.corrupt-backup");
88 let _ = std::fs::rename(path, &backup);
89 return Ok(CacheDbStatus::Rebuilt);
90 }
91 }
92 Err(e) => {
93 tracing::error!(path = %path.display(), error = %e, "cache DB unreadable");
94 let backup = path.with_extension("sqlite3.corrupt-backup");
95 let _ = std::fs::rename(path, &backup);
96 return Ok(CacheDbStatus::Rebuilt);
97 }
98 }
99 Ok(CacheDbStatus::Ok)
100}
101
102#[derive(Debug, Default)]
104pub struct IntegrityReport {
105 pub orphaned_child_chunks: u64,
107 pub orphaned_kw_records: u64,
109 pub orphaned_embedding_records: u64,
111 pub orphaned_files: u64,
113}
114
115impl IntegrityReport {
116 pub fn is_clean(&self) -> bool {
117 self.orphaned_child_chunks == 0
118 && self.orphaned_kw_records == 0
119 && self.orphaned_embedding_records == 0
120 && self.orphaned_files == 0
121 }
122}
123
124pub fn check_catalog_integrity(catalog: &Catalog) -> OrbokResult<IntegrityReport> {
127 let conn = catalog.lock();
128 let mut report = IntegrityReport::default();
129 let q = |sql: &str| -> OrbokResult<u64> {
130 let n: i64 = conn
131 .query_row(sql, [], |r| r.get(0))
132 .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
133 Ok(n as u64)
134 };
135 report.orphaned_child_chunks = q(
136 "SELECT COUNT(*) FROM chunks c \
137 WHERE c.parent_chunk_id IS NOT NULL \
138 AND NOT EXISTS (SELECT 1 FROM chunks p WHERE p.chunk_id = c.parent_chunk_id)",
139 )?;
140 report.orphaned_kw_records = q(
141 "SELECT COUNT(*) FROM keyword_index_records k \
142 WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.chunk_id = k.chunk_id)",
143 )?;
144 report.orphaned_embedding_records = q(
145 "SELECT COUNT(*) FROM embeddings e \
146 WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.chunk_id = e.chunk_id)",
147 )?;
148 report.orphaned_files = q(
149 "SELECT COUNT(*) FROM files f \
150 WHERE NOT EXISTS (SELECT 1 FROM sources s WHERE s.source_id = f.source_id)",
151 )?;
152 Ok(report)
153}