Skip to main content

orbok_workers/
recovery.rs

1//! Crash recovery (RFC-018): detects and repairs interrupted state
2//! left by a previous session that terminated abnormally.
3//!
4//! Called at startup before any work begins. All repairs are non-destructive:
5//! running jobs are reset to queued (not deleted), and the previous active
6//! index is preserved (RFC-006 §12 replace-on-success guarantee).
7
8use orbok_core::{OrbokResult, now_iso8601};
9use orbok_db::Catalog;
10use std::path::Path;
11
12/// Results of the startup recovery scan (RFC-018 §16 requirements).
13#[derive(Debug, Default)]
14pub struct RecoveryReport {
15    /// Jobs that were `running` and reset to `queued`.
16    pub jobs_reset: u64,
17    /// Jobs already `queued` from a prior session (still pending).
18    pub jobs_pending: u64,
19    /// Whether the cache DB was missing and recreated (empty).
20    pub cache_recreated: bool,
21    /// Whether the cache DB was detected as corrupt and rebuilt.
22    pub cache_rebuilt: bool,
23}
24
25/// Run all startup recovery steps.
26///
27/// Must be called before any worker processes jobs or any search is run.
28pub fn run_startup_recovery(
29    catalog: &Catalog,
30    cache_db_path: &Path,
31) -> OrbokResult<RecoveryReport> {
32    let mut report = RecoveryReport::default();
33    report.jobs_reset = reset_interrupted_jobs(catalog)?;
34    report.jobs_pending = count_pending_jobs(catalog)?;
35    let cache_status = ensure_cache_db(cache_db_path)?;
36    report.cache_recreated = cache_status == CacheDbStatus::Recreated;
37    report.cache_rebuilt = cache_status == CacheDbStatus::Rebuilt;
38    if report.jobs_reset > 0 {
39        tracing::warn!(reset = report.jobs_reset, "reset interrupted jobs to queued on startup");
40    }
41    Ok(report)
42}
43
44/// RFC-018 §16 test 1: any job left in `running` state from a previous
45/// session is reset to `queued` so workers will retry it.
46fn reset_interrupted_jobs(catalog: &Catalog) -> OrbokResult<u64> {
47    let conn = catalog.lock();
48    let n = conn
49        .execute(
50            "UPDATE index_jobs SET status = 'queued', updated_at = ?1 WHERE status = 'running'",
51            rusqlite::params![now_iso8601()],
52        )
53        .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
54    Ok(n as u64)
55}
56
57fn count_pending_jobs(catalog: &Catalog) -> OrbokResult<u64> {
58    let conn = catalog.lock();
59    let n: i64 = conn
60        .query_row("SELECT COUNT(*) FROM index_jobs WHERE status = 'queued'", [], |r| r.get(0))
61        .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
62    Ok(n as u64)
63}
64
65#[derive(PartialEq)]
66enum CacheDbStatus {
67    Ok,
68    Recreated,
69    Rebuilt,
70}
71
72/// RFC-018 §16 test 3/4: ensure the cache DB is accessible.
73/// Missing → recreate empty. Corrupt → back up and recreate.
74fn ensure_cache_db(path: &Path) -> OrbokResult<CacheDbStatus> {
75    if !path.exists() {
76        // Missing: localcache will create it on first open; nothing to do.
77        return Ok(CacheDbStatus::Recreated);
78    }
79    // Integrity probe: open and run `PRAGMA integrity_check`.
80    match rusqlite::Connection::open(path) {
81        Ok(conn) => {
82            let result: String = conn
83                .query_row("PRAGMA integrity_check", [], |r| r.get(0))
84                .unwrap_or_else(|_| "error".to_string());
85            if result != "ok" {
86                tracing::error!(path = %path.display(), "cache DB corrupt — backing up and removing");
87                let backup = path.with_extension("sqlite3.corrupt-backup");
88                let _ = std::fs::rename(path, &backup);
89                return Ok(CacheDbStatus::Rebuilt);
90            }
91        }
92        Err(e) => {
93            tracing::error!(path = %path.display(), error = %e, "cache DB unreadable");
94            let backup = path.with_extension("sqlite3.corrupt-backup");
95            let _ = std::fs::rename(path, &backup);
96            return Ok(CacheDbStatus::Rebuilt);
97        }
98    }
99    Ok(CacheDbStatus::Ok)
100}
101
102/// Catalog integrity report (RFC-018 §16 test 7).
103#[derive(Debug, Default)]
104pub struct IntegrityReport {
105    /// Chunks whose parent chunk no longer exists.
106    pub orphaned_child_chunks: u64,
107    /// Keyword index records without a matching chunk.
108    pub orphaned_kw_records: u64,
109    /// Embedding records without a matching chunk.
110    pub orphaned_embedding_records: u64,
111    /// Files without a parent source.
112    pub orphaned_files: u64,
113}
114
115impl IntegrityReport {
116    pub fn is_clean(&self) -> bool {
117        self.orphaned_child_chunks == 0
118            && self.orphaned_kw_records == 0
119            && self.orphaned_embedding_records == 0
120            && self.orphaned_files == 0
121    }
122}
123
124/// Run catalog integrity checks (RFC-018 §16 test 7).
125/// Read-only — does not repair, only reports.
126pub fn check_catalog_integrity(catalog: &Catalog) -> OrbokResult<IntegrityReport> {
127    let conn = catalog.lock();
128    let mut report = IntegrityReport::default();
129    let q = |sql: &str| -> OrbokResult<u64> {
130        let n: i64 = conn
131            .query_row(sql, [], |r| r.get(0))
132            .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
133        Ok(n as u64)
134    };
135    report.orphaned_child_chunks = q(
136        "SELECT COUNT(*) FROM chunks c \
137         WHERE c.parent_chunk_id IS NOT NULL \
138         AND NOT EXISTS (SELECT 1 FROM chunks p WHERE p.chunk_id = c.parent_chunk_id)",
139    )?;
140    report.orphaned_kw_records = q(
141        "SELECT COUNT(*) FROM keyword_index_records k \
142         WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.chunk_id = k.chunk_id)",
143    )?;
144    report.orphaned_embedding_records = q(
145        "SELECT COUNT(*) FROM embeddings e \
146         WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.chunk_id = e.chunk_id)",
147    )?;
148    report.orphaned_files = q(
149        "SELECT COUNT(*) FROM files f \
150         WHERE NOT EXISTS (SELECT 1 FROM sources s WHERE s.source_id = f.source_id)",
151    )?;
152    Ok(report)
153}