1mod chunk_adapter;
18mod chunk_and_index;
19pub mod cleanup_service;
20mod embedding;
21mod extract;
22pub mod model_verifier;
23pub mod recovery;
24pub mod storage;
25
26#[cfg(test)]
27mod tests;
28
29pub use chunk_and_index::ChunkAndIndexWorker;
30pub use cleanup_service::{CleanupService, FullCleanupOutcome};
31pub use embedding::EmbeddingWorker;
32pub use extract::ExtractionWorker;
33pub use model_verifier::{
34 FileIssue, FileIssueKind, VerifyOutcome, verify_embedding_model, verify_outcome_summary,
35};
36pub use recovery::{
37 IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery,
38};
39pub use storage::update_storage_accounting;
40
41use orbok_core::OrbokResult;
42use orbok_core::{JobStatus, JobType};
43use orbok_db::Catalog;
44use orbok_db::repo::IndexJobRepository;
45use tracing::warn;
46
47pub fn run_pending(
50 catalog: &Catalog,
51 extract_worker: &ExtractionWorker<'_>,
52 chunk_worker: &ChunkAndIndexWorker<'_>,
53 embed_worker: Option<&EmbeddingWorker<'_>>,
54 limit: u32,
55) -> OrbokResult<u64> {
56 let jobs = IndexJobRepository::new(catalog);
57 let mut succeeded = 0u64;
58 let mut processed = 0u32;
59
60 while processed < limit {
61 let batch = jobs.list_queued(1)?;
62 if batch.is_empty() {
63 break;
64 }
65 let job = &batch[0];
66 jobs.set_status(&job.job_id, JobStatus::Running)?;
67 let result = match job.job_type {
68 JobType::Extract => {
69 if let Some(file_id) = &job.file_id {
70 extract_worker.run(file_id)
71 } else {
72 Ok(())
73 }
74 }
75 JobType::Chunk | JobType::KeywordIndex => {
76 if let Some(file_id) = &job.file_id {
77 chunk_worker.run(file_id)
78 } else {
79 Ok(())
80 }
81 }
82 JobType::Embedding => {
83 if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
84 worker.run(file_id)
85 } else {
86 Ok(())
87 }
88 }
89 _ => Ok(()), };
91 match result {
92 Ok(()) => {
93 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
94 succeeded += 1;
95 }
96 Err(e) => {
97 warn!(job = job.job_id.as_str(), error = %e, "job failed");
98 jobs.set_status(&job.job_id, JobStatus::Failed)?;
99 }
100 }
101 processed += 1;
102 }
103 Ok(succeeded)
104}