1mod chunk_and_index;
18mod embedding;
19mod extract;
20pub mod cleanup_service;
21pub mod model_verifier;
22pub mod recovery;
23pub mod storage;
24
25#[cfg(test)]
26mod tests;
27
28pub use chunk_and_index::ChunkAndIndexWorker;
29pub use embedding::EmbeddingWorker;
30pub use extract::ExtractionWorker;
31pub use cleanup_service::{CleanupService, FullCleanupOutcome};
32pub use model_verifier::{FileIssue, FileIssueKind, VerifyOutcome, verify_embedding_model, verify_outcome_summary};
33pub use recovery::{IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery};
34pub use storage::update_storage_accounting;
35
36use orbok_core::OrbokResult;
37use orbok_db::Catalog;
38use orbok_core::{JobStatus, JobType};
39use orbok_db::repo::IndexJobRepository;
40use tracing::warn;
41
42pub fn run_pending(
45 catalog: &Catalog,
46 extract_worker: &ExtractionWorker<'_>,
47 chunk_worker: &ChunkAndIndexWorker<'_>,
48 embed_worker: Option<&EmbeddingWorker<'_>>,
49 limit: u32,
50) -> OrbokResult<u64> {
51 let jobs = IndexJobRepository::new(catalog);
52 let mut succeeded = 0u64;
53 let mut processed = 0u32;
54
55 while processed < limit {
56 let batch = jobs.list_queued(1)?;
57 if batch.is_empty() {
58 break;
59 }
60 let job = &batch[0];
61 jobs.set_status(&job.job_id, JobStatus::Running)?;
62 let result = match job.job_type {
63 JobType::Extract => {
64 if let Some(file_id) = &job.file_id {
65 extract_worker.run(file_id)
66 } else {
67 Ok(())
68 }
69 }
70 JobType::Chunk | JobType::KeywordIndex => {
71 if let Some(file_id) = &job.file_id {
72 chunk_worker.run(file_id)
73 } else {
74 Ok(())
75 }
76 }
77 JobType::Embedding => {
78 if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
79 worker.run(file_id)
80 } else {
81 Ok(())
82 }
83 }
84 _ => Ok(()), };
86 match result {
87 Ok(()) => {
88 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
89 succeeded += 1;
90 }
91 Err(e) => {
92 warn!(job = job.job_id.as_str(), error = %e, "job failed");
93 jobs.set_status(&job.job_id, JobStatus::Failed)?;
94 }
95 }
96 processed += 1;
97 }
98 Ok(succeeded)
99}