1mod chunk_and_index;
18pub mod cleanup_service;
19mod embedding;
20mod extract;
21pub mod model_verifier;
22pub mod recovery;
23pub mod storage;
24
25#[cfg(test)]
26mod tests;
27
28pub use chunk_and_index::ChunkAndIndexWorker;
29pub use cleanup_service::{CleanupService, FullCleanupOutcome};
30pub use embedding::EmbeddingWorker;
31pub use extract::ExtractionWorker;
32pub use model_verifier::{
33 FileIssue, FileIssueKind, VerifyOutcome, verify_embedding_model, verify_outcome_summary,
34};
35pub use recovery::{
36 IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery,
37};
38pub use storage::update_storage_accounting;
39
40use orbok_core::OrbokResult;
41use orbok_core::{JobStatus, JobType};
42use orbok_db::Catalog;
43use orbok_db::repo::IndexJobRepository;
44use tracing::warn;
45
46pub fn run_pending(
49 catalog: &Catalog,
50 extract_worker: &ExtractionWorker<'_>,
51 chunk_worker: &ChunkAndIndexWorker<'_>,
52 embed_worker: Option<&EmbeddingWorker<'_>>,
53 limit: u32,
54) -> OrbokResult<u64> {
55 let jobs = IndexJobRepository::new(catalog);
56 let mut succeeded = 0u64;
57 let mut processed = 0u32;
58
59 while processed < limit {
60 let batch = jobs.list_queued(1)?;
61 if batch.is_empty() {
62 break;
63 }
64 let job = &batch[0];
65 jobs.set_status(&job.job_id, JobStatus::Running)?;
66 let result = match job.job_type {
67 JobType::Extract => {
68 if let Some(file_id) = &job.file_id {
69 extract_worker.run(file_id)
70 } else {
71 Ok(())
72 }
73 }
74 JobType::Chunk | JobType::KeywordIndex => {
75 if let Some(file_id) = &job.file_id {
76 chunk_worker.run(file_id)
77 } else {
78 Ok(())
79 }
80 }
81 JobType::Embedding => {
82 if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
83 worker.run(file_id)
84 } else {
85 Ok(())
86 }
87 }
88 _ => Ok(()), };
90 match result {
91 Ok(()) => {
92 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
93 succeeded += 1;
94 }
95 Err(e) => {
96 warn!(job = job.job_id.as_str(), error = %e, "job failed");
97 jobs.set_status(&job.job_id, JobStatus::Failed)?;
98 }
99 }
100 processed += 1;
101 }
102 Ok(succeeded)
103}