Skip to main content

orbok_workers/
lib.rs

1//! # orbok-workers
2//!
3//! Synchronous pipeline workers for M5/M6: pull queued jobs from the
4//! catalog and execute them in dependency order.
5//!
6//! **Worker chain (per file):**
7//! ```text
8//! [Scan queues Extract]
9//!   → ExtractionWorker  (extract + cache + record)
10//!   → ChunkAndIndexWorker (chunk + FTS index + chunk_locations)
11//! ```
12//!
13//! Failure isolation: one file's failure never stops the whole run
14//! (RFC-004 §16, RFC-005 §13). Workers update the relevant catalog
15//! records with the error category.
16
17mod chunk_and_index;
18mod embedding;
19mod extract;
20
21#[cfg(test)]
22mod tests;
23
24pub use chunk_and_index::ChunkAndIndexWorker;
25pub use embedding::EmbeddingWorker;
26pub use extract::ExtractionWorker;
27
28use orbok_core::OrbokResult;
29use orbok_db::Catalog;
30use orbok_core::{JobStatus, JobType};
31use orbok_db::repo::IndexJobRepository;
32use tracing::warn;
33
34/// Run all queued jobs until the queue is empty or `limit` jobs have
35/// been processed. Returns the number of jobs that succeeded.
36pub fn run_pending(
37    catalog: &Catalog,
38    extract_worker: &ExtractionWorker<'_>,
39    chunk_worker: &ChunkAndIndexWorker<'_>,
40    embed_worker: Option<&EmbeddingWorker<'_>>,
41    limit: u32,
42) -> OrbokResult<u64> {
43    let jobs = IndexJobRepository::new(catalog);
44    let mut succeeded = 0u64;
45    let mut processed = 0u32;
46
47    while processed < limit {
48        let batch = jobs.list_queued(1)?;
49        if batch.is_empty() {
50            break;
51        }
52        let job = &batch[0];
53        jobs.set_status(&job.job_id, JobStatus::Running)?;
54        let result = match job.job_type {
55            JobType::Extract => {
56                if let Some(file_id) = &job.file_id {
57                    extract_worker.run(file_id)
58                } else {
59                    Ok(())
60                }
61            }
62            JobType::Chunk | JobType::KeywordIndex => {
63                if let Some(file_id) = &job.file_id {
64                    chunk_worker.run(file_id)
65                } else {
66                    Ok(())
67                }
68            }
69            JobType::Embedding => {
70                if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
71                    worker.run(file_id)
72                } else {
73                    Ok(())
74                }
75            }
76            _ => Ok(()), // Other job types are no-ops in v0.2.
77        };
78        match result {
79            Ok(()) => {
80                jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
81                succeeded += 1;
82            }
83            Err(e) => {
84                warn!(job = job.job_id.as_str(), error = %e, "job failed");
85                jobs.set_status(&job.job_id, JobStatus::Failed)?;
86            }
87        }
88        processed += 1;
89    }
90    Ok(succeeded)
91}