1mod extract;
18mod chunk_and_index;
19
20#[cfg(test)]
21mod tests;
22
23pub use extract::ExtractionWorker;
24pub use chunk_and_index::ChunkAndIndexWorker;
25
26use orbok_core::OrbokResult;
27use orbok_db::Catalog;
28use orbok_core::{JobStatus, JobType};
29use orbok_db::repo::IndexJobRepository;
30use tracing::warn;
31
32pub fn run_pending(
35 catalog: &Catalog,
36 extract_worker: &ExtractionWorker<'_>,
37 chunk_worker: &ChunkAndIndexWorker<'_>,
38 limit: u32,
39) -> OrbokResult<u64> {
40 let jobs = IndexJobRepository::new(catalog);
41 let mut succeeded = 0u64;
42 let mut processed = 0u32;
43
44 while processed < limit {
45 let batch = jobs.list_queued(1)?;
46 if batch.is_empty() {
47 break;
48 }
49 let job = &batch[0];
50 jobs.set_status(&job.job_id, JobStatus::Running)?;
51 let result = match job.job_type {
52 JobType::Extract => {
53 if let Some(file_id) = &job.file_id {
54 extract_worker.run(file_id)
55 } else {
56 Ok(())
57 }
58 }
59 JobType::Chunk | JobType::KeywordIndex => {
60 if let Some(file_id) = &job.file_id {
61 chunk_worker.run(file_id)
62 } else {
63 Ok(())
64 }
65 }
66 _ => Ok(()), };
68 match result {
69 Ok(()) => {
70 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
71 succeeded += 1;
72 }
73 Err(e) => {
74 warn!(job = job.job_id.as_str(), error = %e, "job failed");
75 jobs.set_status(&job.job_id, JobStatus::Failed)?;
76 }
77 }
78 processed += 1;
79 }
80 Ok(succeeded)
81}