1mod chunk_and_index;
18mod embedding;
19mod extract;
20pub mod cleanup_service;
21pub mod recovery;
22pub mod storage;
23
24#[cfg(test)]
25mod tests;
26
27pub use chunk_and_index::ChunkAndIndexWorker;
28pub use embedding::EmbeddingWorker;
29pub use extract::ExtractionWorker;
30pub use cleanup_service::{CleanupService, FullCleanupOutcome};
31pub use recovery::{IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery};
32pub use storage::update_storage_accounting;
33
34use orbok_core::OrbokResult;
35use orbok_db::Catalog;
36use orbok_core::{JobStatus, JobType};
37use orbok_db::repo::IndexJobRepository;
38use tracing::warn;
39
40pub fn run_pending(
43 catalog: &Catalog,
44 extract_worker: &ExtractionWorker<'_>,
45 chunk_worker: &ChunkAndIndexWorker<'_>,
46 embed_worker: Option<&EmbeddingWorker<'_>>,
47 limit: u32,
48) -> OrbokResult<u64> {
49 let jobs = IndexJobRepository::new(catalog);
50 let mut succeeded = 0u64;
51 let mut processed = 0u32;
52
53 while processed < limit {
54 let batch = jobs.list_queued(1)?;
55 if batch.is_empty() {
56 break;
57 }
58 let job = &batch[0];
59 jobs.set_status(&job.job_id, JobStatus::Running)?;
60 let result = match job.job_type {
61 JobType::Extract => {
62 if let Some(file_id) = &job.file_id {
63 extract_worker.run(file_id)
64 } else {
65 Ok(())
66 }
67 }
68 JobType::Chunk | JobType::KeywordIndex => {
69 if let Some(file_id) = &job.file_id {
70 chunk_worker.run(file_id)
71 } else {
72 Ok(())
73 }
74 }
75 JobType::Embedding => {
76 if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
77 worker.run(file_id)
78 } else {
79 Ok(())
80 }
81 }
82 _ => Ok(()), };
84 match result {
85 Ok(()) => {
86 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
87 succeeded += 1;
88 }
89 Err(e) => {
90 warn!(job = job.job_id.as_str(), error = %e, "job failed");
91 jobs.set_status(&job.job_id, JobStatus::Failed)?;
92 }
93 }
94 processed += 1;
95 }
96 Ok(succeeded)
97}