Skip to main content

orbok_workers/
lib.rs

1//! # orbok-workers
2//!
3//! Synchronous pipeline workers for M5/M6: pull queued jobs from the
4//! catalog and execute them in dependency order.
5//!
6//! **Worker chain (per file):**
7//! ```text
8//! [Scan queues Extract]
9//!   → ExtractionWorker  (extract + cache + record)
10//!   → ChunkAndIndexWorker (chunk + FTS index + chunk_locations)
11//! ```
12//!
13//! Failure isolation: one file's failure never stops the whole run
14//! (RFC-004 §16, RFC-005 §13). Workers update the relevant catalog
15//! records with the error category.
16
17mod chunk_adapter;
18mod chunk_and_index;
19pub mod cleanup_service;
20mod embedding;
21mod extract;
22pub mod model_verifier;
23pub mod recovery;
24pub mod storage;
25
26#[cfg(test)]
27mod tests;
28
29pub use chunk_and_index::ChunkAndIndexWorker;
30pub use cleanup_service::{CleanupService, FullCleanupOutcome};
31pub use embedding::EmbeddingWorker;
32pub use extract::ExtractionWorker;
33pub use model_verifier::{
34    FileIssue, FileIssueKind, VerifyOutcome, verify_embedding_model, verify_outcome_summary,
35};
36pub use recovery::{
37    IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery,
38};
39pub use storage::update_storage_accounting;
40
41use orbok_core::OrbokResult;
42use orbok_core::{JobStatus, JobType};
43use orbok_db::Catalog;
44use orbok_db::repo::IndexJobRepository;
45use tracing::warn;
46
47/// Run all queued jobs until the queue is empty or `limit` jobs have
48/// been processed. Returns the number of jobs that succeeded.
49pub fn run_pending(
50    catalog: &Catalog,
51    extract_worker: &ExtractionWorker<'_>,
52    chunk_worker: &ChunkAndIndexWorker<'_>,
53    embed_worker: Option<&EmbeddingWorker<'_>>,
54    limit: u32,
55) -> OrbokResult<u64> {
56    let jobs = IndexJobRepository::new(catalog);
57    let mut succeeded = 0u64;
58    let mut processed = 0u32;
59
60    while processed < limit {
61        let batch = jobs.list_queued(1)?;
62        if batch.is_empty() {
63            break;
64        }
65        let job = &batch[0];
66        jobs.set_status(&job.job_id, JobStatus::Running)?;
67        let result = match job.job_type {
68            JobType::Extract => {
69                if let Some(file_id) = &job.file_id {
70                    extract_worker.run(file_id)
71                } else {
72                    Ok(())
73                }
74            }
75            JobType::Chunk | JobType::KeywordIndex => {
76                if let Some(file_id) = &job.file_id {
77                    chunk_worker.run(file_id)
78                } else {
79                    Ok(())
80                }
81            }
82            JobType::Embedding => {
83                if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
84                    worker.run(file_id)
85                } else {
86                    Ok(())
87                }
88            }
89            _ => Ok(()), // Other job types are no-ops in v0.2.
90        };
91        match result {
92            Ok(()) => {
93                jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
94                succeeded += 1;
95            }
96            Err(e) => {
97                warn!(job = job.job_id.as_str(), error = %e, "job failed");
98                jobs.set_status(&job.job_id, JobStatus::Failed)?;
99            }
100        }
101        processed += 1;
102    }
103    Ok(succeeded)
104}