Skip to main content

orbok_workers/
lib.rs

1//! # orbok-workers
2//!
3//! Synchronous pipeline workers for M5/M6: pull queued jobs from the
4//! catalog and execute them in dependency order.
5//!
6//! **Worker chain (per file):**
7//! ```text
8//! [Scan queues Extract]
9//!   → ExtractionWorker  (extract + cache + record)
10//!   → ChunkAndIndexWorker (chunk + FTS index + chunk_locations)
11//! ```
12//!
13//! Failure isolation: one file's failure never stops the whole run
14//! (RFC-004 §16, RFC-005 §13). Workers update the relevant catalog
15//! records with the error category.
16
17mod extract;
18mod chunk_and_index;
19
20#[cfg(test)]
21mod tests;
22
23pub use extract::ExtractionWorker;
24pub use chunk_and_index::ChunkAndIndexWorker;
25
26use orbok_core::OrbokResult;
27use orbok_db::Catalog;
28use orbok_core::{JobStatus, JobType};
29use orbok_db::repo::IndexJobRepository;
30use tracing::warn;
31
32/// Run all queued jobs until the queue is empty or `limit` jobs have
33/// been processed. Returns the number of jobs that succeeded.
34pub fn run_pending(
35    catalog: &Catalog,
36    extract_worker: &ExtractionWorker<'_>,
37    chunk_worker: &ChunkAndIndexWorker<'_>,
38    limit: u32,
39) -> OrbokResult<u64> {
40    let jobs = IndexJobRepository::new(catalog);
41    let mut succeeded = 0u64;
42    let mut processed = 0u32;
43
44    while processed < limit {
45        let batch = jobs.list_queued(1)?;
46        if batch.is_empty() {
47            break;
48        }
49        let job = &batch[0];
50        jobs.set_status(&job.job_id, JobStatus::Running)?;
51        let result = match job.job_type {
52            JobType::Extract => {
53                if let Some(file_id) = &job.file_id {
54                    extract_worker.run(file_id)
55                } else {
56                    Ok(())
57                }
58            }
59            JobType::Chunk | JobType::KeywordIndex => {
60                if let Some(file_id) = &job.file_id {
61                    chunk_worker.run(file_id)
62                } else {
63                    Ok(())
64                }
65            }
66            _ => Ok(()), // Other job types are no-ops in v0.2.
67        };
68        match result {
69            Ok(()) => {
70                jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
71                succeeded += 1;
72            }
73            Err(e) => {
74                warn!(job = job.job_id.as_str(), error = %e, "job failed");
75                jobs.set_status(&job.job_id, JobStatus::Failed)?;
76            }
77        }
78        processed += 1;
79    }
80    Ok(succeeded)
81}