1mod chunk_and_index;
18mod embedding;
19mod extract;
20
21#[cfg(test)]
22mod tests;
23
24pub use chunk_and_index::ChunkAndIndexWorker;
25pub use embedding::EmbeddingWorker;
26pub use extract::ExtractionWorker;
27
28use orbok_core::OrbokResult;
29use orbok_db::Catalog;
30use orbok_core::{JobStatus, JobType};
31use orbok_db::repo::IndexJobRepository;
32use tracing::warn;
33
34pub fn run_pending(
37 catalog: &Catalog,
38 extract_worker: &ExtractionWorker<'_>,
39 chunk_worker: &ChunkAndIndexWorker<'_>,
40 embed_worker: Option<&EmbeddingWorker<'_>>,
41 limit: u32,
42) -> OrbokResult<u64> {
43 let jobs = IndexJobRepository::new(catalog);
44 let mut succeeded = 0u64;
45 let mut processed = 0u32;
46
47 while processed < limit {
48 let batch = jobs.list_queued(1)?;
49 if batch.is_empty() {
50 break;
51 }
52 let job = &batch[0];
53 jobs.set_status(&job.job_id, JobStatus::Running)?;
54 let result = match job.job_type {
55 JobType::Extract => {
56 if let Some(file_id) = &job.file_id {
57 extract_worker.run(file_id)
58 } else {
59 Ok(())
60 }
61 }
62 JobType::Chunk | JobType::KeywordIndex => {
63 if let Some(file_id) = &job.file_id {
64 chunk_worker.run(file_id)
65 } else {
66 Ok(())
67 }
68 }
69 JobType::Embedding => {
70 if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
71 worker.run(file_id)
72 } else {
73 Ok(())
74 }
75 }
76 _ => Ok(()), };
78 match result {
79 Ok(()) => {
80 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
81 succeeded += 1;
82 }
83 Err(e) => {
84 warn!(job = job.job_id.as_str(), error = %e, "job failed");
85 jobs.set_status(&job.job_id, JobStatus::Failed)?;
86 }
87 }
88 processed += 1;
89 }
90 Ok(succeeded)
91}