Skip to main content

orbok_workers/
lib.rs

1//! # orbok-workers
2//!
3//! Synchronous pipeline workers for M5/M6: pull queued jobs from the
4//! catalog and execute them in dependency order.
5//!
6//! **Worker chain (per file):**
7//! ```text
8//! [Scan queues Extract]
9//!   → ExtractionWorker  (extract + cache + record)
10//!   → ChunkAndIndexWorker (chunk + FTS index + chunk_locations)
11//! ```
12//!
13//! Failure isolation: one file's failure never stops the whole run
14//! (RFC-004 §16, RFC-005 §13). Workers update the relevant catalog
15//! records with the error category.
16
17mod chunk_and_index;
18mod embedding;
19mod extract;
20pub mod recovery;
21pub mod storage;
22
23#[cfg(test)]
24mod tests;
25
26pub use chunk_and_index::ChunkAndIndexWorker;
27pub use embedding::EmbeddingWorker;
28pub use extract::ExtractionWorker;
29pub use recovery::{IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery};
30pub use storage::update_storage_accounting;
31
32use orbok_core::OrbokResult;
33use orbok_db::Catalog;
34use orbok_core::{JobStatus, JobType};
35use orbok_db::repo::IndexJobRepository;
36use tracing::warn;
37
38/// Run all queued jobs until the queue is empty or `limit` jobs have
39/// been processed. Returns the number of jobs that succeeded.
40pub fn run_pending(
41    catalog: &Catalog,
42    extract_worker: &ExtractionWorker<'_>,
43    chunk_worker: &ChunkAndIndexWorker<'_>,
44    embed_worker: Option<&EmbeddingWorker<'_>>,
45    limit: u32,
46) -> OrbokResult<u64> {
47    let jobs = IndexJobRepository::new(catalog);
48    let mut succeeded = 0u64;
49    let mut processed = 0u32;
50
51    while processed < limit {
52        let batch = jobs.list_queued(1)?;
53        if batch.is_empty() {
54            break;
55        }
56        let job = &batch[0];
57        jobs.set_status(&job.job_id, JobStatus::Running)?;
58        let result = match job.job_type {
59            JobType::Extract => {
60                if let Some(file_id) = &job.file_id {
61                    extract_worker.run(file_id)
62                } else {
63                    Ok(())
64                }
65            }
66            JobType::Chunk | JobType::KeywordIndex => {
67                if let Some(file_id) = &job.file_id {
68                    chunk_worker.run(file_id)
69                } else {
70                    Ok(())
71                }
72            }
73            JobType::Embedding => {
74                if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
75                    worker.run(file_id)
76                } else {
77                    Ok(())
78                }
79            }
80            _ => Ok(()), // Other job types are no-ops in v0.2.
81        };
82        match result {
83            Ok(()) => {
84                jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
85                succeeded += 1;
86            }
87            Err(e) => {
88                warn!(job = job.job_id.as_str(), error = %e, "job failed");
89                jobs.set_status(&job.job_id, JobStatus::Failed)?;
90            }
91        }
92        processed += 1;
93    }
94    Ok(succeeded)
95}