use orbok_core::{OrbokResult, now_iso8601};
use orbok_db::Catalog;
use std::path::Path;
#[derive(Debug, Default)]
pub struct RecoveryReport {
pub jobs_reset: u64,
pub jobs_pending: u64,
pub cache_recreated: bool,
pub cache_rebuilt: bool,
}
pub fn run_startup_recovery(
catalog: &Catalog,
cache_db_path: &Path,
) -> OrbokResult<RecoveryReport> {
let mut report = RecoveryReport::default();
report.jobs_reset = reset_interrupted_jobs(catalog)?;
report.jobs_pending = count_pending_jobs(catalog)?;
let cache_status = ensure_cache_db(cache_db_path)?;
report.cache_recreated = cache_status == CacheDbStatus::Recreated;
report.cache_rebuilt = cache_status == CacheDbStatus::Rebuilt;
if report.jobs_reset > 0 {
tracing::warn!(reset = report.jobs_reset, "reset interrupted jobs to queued on startup");
}
Ok(report)
}
fn reset_interrupted_jobs(catalog: &Catalog) -> OrbokResult<u64> {
let conn = catalog.lock();
let n = conn
.execute(
"UPDATE index_jobs SET status = 'queued', updated_at = ?1 WHERE status = 'running'",
rusqlite::params![now_iso8601()],
)
.map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
Ok(n as u64)
}
fn count_pending_jobs(catalog: &Catalog) -> OrbokResult<u64> {
let conn = catalog.lock();
let n: i64 = conn
.query_row("SELECT COUNT(*) FROM index_jobs WHERE status = 'queued'", [], |r| r.get(0))
.map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
Ok(n as u64)
}
#[derive(PartialEq)]
enum CacheDbStatus {
Ok,
Recreated,
Rebuilt,
}
fn ensure_cache_db(path: &Path) -> OrbokResult<CacheDbStatus> {
if !path.exists() {
return Ok(CacheDbStatus::Recreated);
}
match rusqlite::Connection::open(path) {
Ok(conn) => {
let result: String = conn
.query_row("PRAGMA integrity_check", [], |r| r.get(0))
.unwrap_or_else(|_| "error".to_string());
if result != "ok" {
tracing::error!(path = %path.display(), "cache DB corrupt — backing up and removing");
let backup = path.with_extension("sqlite3.corrupt-backup");
let _ = std::fs::rename(path, &backup);
return Ok(CacheDbStatus::Rebuilt);
}
}
Err(e) => {
tracing::error!(path = %path.display(), error = %e, "cache DB unreadable");
let backup = path.with_extension("sqlite3.corrupt-backup");
let _ = std::fs::rename(path, &backup);
return Ok(CacheDbStatus::Rebuilt);
}
}
Ok(CacheDbStatus::Ok)
}
#[derive(Debug, Default)]
pub struct IntegrityReport {
pub orphaned_child_chunks: u64,
pub orphaned_kw_records: u64,
pub orphaned_embedding_records: u64,
pub orphaned_files: u64,
}
impl IntegrityReport {
pub fn is_clean(&self) -> bool {
self.orphaned_child_chunks == 0
&& self.orphaned_kw_records == 0
&& self.orphaned_embedding_records == 0
&& self.orphaned_files == 0
}
}
pub fn check_catalog_integrity(catalog: &Catalog) -> OrbokResult<IntegrityReport> {
let conn = catalog.lock();
let mut report = IntegrityReport::default();
let q = |sql: &str| -> OrbokResult<u64> {
let n: i64 = conn
.query_row(sql, [], |r| r.get(0))
.map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
Ok(n as u64)
};
report.orphaned_child_chunks = q(
"SELECT COUNT(*) FROM chunks c \
WHERE c.parent_chunk_id IS NOT NULL \
AND NOT EXISTS (SELECT 1 FROM chunks p WHERE p.chunk_id = c.parent_chunk_id)",
)?;
report.orphaned_kw_records = q(
"SELECT COUNT(*) FROM keyword_index_records k \
WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.chunk_id = k.chunk_id)",
)?;
report.orphaned_embedding_records = q(
"SELECT COUNT(*) FROM embeddings e \
WHERE NOT EXISTS (SELECT 1 FROM chunks c WHERE c.chunk_id = e.chunk_id)",
)?;
report.orphaned_files = q(
"SELECT COUNT(*) FROM files f \
WHERE NOT EXISTS (SELECT 1 FROM sources s WHERE s.source_id = f.source_id)",
)?;
Ok(report)
}