Skip to main content

orbok_workers/
lib.rs

1//! # orbok-workers
2//!
3//! Synchronous pipeline workers for M5/M6: pull queued jobs from the
4//! catalog and execute them in dependency order.
5//!
6//! **Worker chain (per file):**
7//! ```text
8//! [Scan queues Extract]
9//!   → ExtractionWorker  (extract + cache + record)
10//!   → ChunkAndIndexWorker (chunk + FTS index + chunk_locations)
11//! ```
12//!
13//! Failure isolation: one file's failure never stops the whole run
14//! (RFC-004 §16, RFC-005 §13). Workers update the relevant catalog
15//! records with the error category.
16
17mod chunk_and_index;
18pub mod cleanup_service;
19mod embedding;
20mod extract;
21pub mod model_verifier;
22pub mod recovery;
23pub mod storage;
24
25#[cfg(test)]
26mod tests;
27
28pub use chunk_and_index::ChunkAndIndexWorker;
29pub use cleanup_service::{CleanupService, FullCleanupOutcome};
30pub use embedding::EmbeddingWorker;
31pub use extract::ExtractionWorker;
32pub use model_verifier::{
33    FileIssue, FileIssueKind, VerifyOutcome, verify_embedding_model, verify_outcome_summary,
34};
35pub use recovery::{
36    IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery,
37};
38pub use storage::update_storage_accounting;
39
40use orbok_core::OrbokResult;
41use orbok_core::{JobStatus, JobType};
42use orbok_db::Catalog;
43use orbok_db::repo::IndexJobRepository;
44use tracing::warn;
45
46/// Run all queued jobs until the queue is empty or `limit` jobs have
47/// been processed. Returns the number of jobs that succeeded.
48pub fn run_pending(
49    catalog: &Catalog,
50    extract_worker: &ExtractionWorker<'_>,
51    chunk_worker: &ChunkAndIndexWorker<'_>,
52    embed_worker: Option<&EmbeddingWorker<'_>>,
53    limit: u32,
54) -> OrbokResult<u64> {
55    let jobs = IndexJobRepository::new(catalog);
56    let mut succeeded = 0u64;
57    let mut processed = 0u32;
58
59    while processed < limit {
60        let batch = jobs.list_queued(1)?;
61        if batch.is_empty() {
62            break;
63        }
64        let job = &batch[0];
65        jobs.set_status(&job.job_id, JobStatus::Running)?;
66        let result = match job.job_type {
67            JobType::Extract => {
68                if let Some(file_id) = &job.file_id {
69                    extract_worker.run(file_id)
70                } else {
71                    Ok(())
72                }
73            }
74            JobType::Chunk | JobType::KeywordIndex => {
75                if let Some(file_id) = &job.file_id {
76                    chunk_worker.run(file_id)
77                } else {
78                    Ok(())
79                }
80            }
81            JobType::Embedding => {
82                if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
83                    worker.run(file_id)
84                } else {
85                    Ok(())
86                }
87            }
88            _ => Ok(()), // Other job types are no-ops in v0.2.
89        };
90        match result {
91            Ok(()) => {
92                jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
93                succeeded += 1;
94            }
95            Err(e) => {
96                warn!(job = job.job_id.as_str(), error = %e, "job failed");
97                jobs.set_status(&job.job_id, JobStatus::Failed)?;
98            }
99        }
100        processed += 1;
101    }
102    Ok(succeeded)
103}