1mod chunk_and_index;
18mod embedding;
19mod extract;
20pub mod recovery;
21pub mod storage;
22
23#[cfg(test)]
24mod tests;
25
26pub use chunk_and_index::ChunkAndIndexWorker;
27pub use embedding::EmbeddingWorker;
28pub use extract::ExtractionWorker;
29pub use recovery::{IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery};
30pub use storage::update_storage_accounting;
31
32use orbok_core::OrbokResult;
33use orbok_db::Catalog;
34use orbok_core::{JobStatus, JobType};
35use orbok_db::repo::IndexJobRepository;
36use tracing::warn;
37
38pub fn run_pending(
41 catalog: &Catalog,
42 extract_worker: &ExtractionWorker<'_>,
43 chunk_worker: &ChunkAndIndexWorker<'_>,
44 embed_worker: Option<&EmbeddingWorker<'_>>,
45 limit: u32,
46) -> OrbokResult<u64> {
47 let jobs = IndexJobRepository::new(catalog);
48 let mut succeeded = 0u64;
49 let mut processed = 0u32;
50
51 while processed < limit {
52 let batch = jobs.list_queued(1)?;
53 if batch.is_empty() {
54 break;
55 }
56 let job = &batch[0];
57 jobs.set_status(&job.job_id, JobStatus::Running)?;
58 let result = match job.job_type {
59 JobType::Extract => {
60 if let Some(file_id) = &job.file_id {
61 extract_worker.run(file_id)
62 } else {
63 Ok(())
64 }
65 }
66 JobType::Chunk | JobType::KeywordIndex => {
67 if let Some(file_id) = &job.file_id {
68 chunk_worker.run(file_id)
69 } else {
70 Ok(())
71 }
72 }
73 JobType::Embedding => {
74 if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
75 worker.run(file_id)
76 } else {
77 Ok(())
78 }
79 }
80 _ => Ok(()), };
82 match result {
83 Ok(()) => {
84 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
85 succeeded += 1;
86 }
87 Err(e) => {
88 warn!(job = job.job_id.as_str(), error = %e, "job failed");
89 jobs.set_status(&job.job_id, JobStatus::Failed)?;
90 }
91 }
92 processed += 1;
93 }
94 Ok(succeeded)
95}