Skip to main content

orbok_workers/
lib.rs

1//! # orbok-workers
2//!
3//! Synchronous pipeline workers for M5/M6: pull queued jobs from the
4//! catalog and execute them in dependency order.
5//!
6//! **Worker chain (per file):**
7//! ```text
8//! [Scan queues Extract]
9//!   → ExtractionWorker  (extract + cache + record)
10//!   → ChunkAndIndexWorker (chunk + FTS index + chunk_locations)
11//! ```
12//!
13//! Failure isolation: one file's failure never stops the whole run
14//! (RFC-004 §16, RFC-005 §13). Workers update the relevant catalog
15//! records with the error category.
16//!
17//! RFC-036 adds the resource-aware `Scheduler` with bounded queues,
18//! priority dispatch, backpressure, pause/resume/cancel, and crash
19//! recovery.
20
21mod chunk_adapter;
22mod chunk_and_index;
23pub mod cleanup_service;
24mod embedding;
25mod extract;
26pub mod model_verifier;
27pub mod recovery;
28pub mod scheduler;
29pub mod storage;
30
31#[cfg(test)]
32mod tests;
33
34pub use chunk_and_index::ChunkAndIndexWorker;
35pub use cleanup_service::{CleanupService, FullCleanupOutcome};
36pub use embedding::EmbeddingWorker;
37pub use extract::ExtractionWorker;
38pub use model_verifier::{
39    FileIssue, FileIssueKind, VerifyOutcome, verify_embedding_model, verify_outcome_summary,
40};
41pub use recovery::{
42    IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery,
43};
44pub use scheduler::{
45    IndexJob, JobKind, JobState, QueueCapacity, QueueKind, ResourceMode, Scheduler,
46    SchedulerConfig, SchedulerEvent, SchedulerLimits, WorkPriority,
47};
48pub use storage::update_storage_accounting;
49
50use orbok_core::OrbokResult;
51use orbok_core::{JobStatus, JobType};
52use orbok_db::Catalog;
53use orbok_db::repo::IndexJobRepository;
54use tracing::warn;
55
56/// Run all queued jobs until the queue is empty or `limit` jobs have
57/// been processed. Returns the number of jobs that succeeded.
58///
59/// This is the legacy synchronous dispatch loop, retained for tests
60/// and simple callers. Production code should use `Scheduler::tick()`
61/// for resource-aware dispatch (RFC-036).
62pub fn run_pending(
63    catalog: &Catalog,
64    extract_worker: &ExtractionWorker<'_>,
65    chunk_worker: &ChunkAndIndexWorker<'_>,
66    embed_worker: Option<&EmbeddingWorker<'_>>,
67    limit: u32,
68) -> OrbokResult<u64> {
69    let jobs = IndexJobRepository::new(catalog);
70    let mut succeeded = 0u64;
71    let mut processed = 0u32;
72
73    while processed < limit {
74        let batch = jobs.list_queued(1)?;
75        if batch.is_empty() {
76            break;
77        }
78        let job = &batch[0];
79        jobs.set_status(&job.job_id, JobStatus::Running)?;
80        let result = match job.job_type {
81            JobType::Extract => {
82                if let Some(file_id) = &job.file_id {
83                    extract_worker.run(file_id)
84                } else {
85                    Ok(())
86                }
87            }
88            JobType::Chunk | JobType::KeywordIndex => {
89                if let Some(file_id) = &job.file_id {
90                    chunk_worker.run(file_id)
91                } else {
92                    Ok(())
93                }
94            }
95            JobType::Embedding => {
96                if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
97                    worker.run(file_id)
98                } else {
99                    Ok(())
100                }
101            }
102            _ => Ok(()), // Other job types are no-ops in v0.2.
103        };
104        match result {
105            Ok(()) => {
106                jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
107                succeeded += 1;
108            }
109            Err(e) => {
110                warn!(job = job.job_id.as_str(), error = %e, "job failed");
111                jobs.set_status(&job.job_id, JobStatus::Failed)?;
112            }
113        }
114        processed += 1;
115    }
116    Ok(succeeded)
117}