1mod chunk_and_index;
18mod embedding;
19mod extract;
20pub mod storage;
21
22#[cfg(test)]
23mod tests;
24
25pub use chunk_and_index::ChunkAndIndexWorker;
26pub use embedding::EmbeddingWorker;
27pub use extract::ExtractionWorker;
28pub use storage::update_storage_accounting;
29
30use orbok_core::OrbokResult;
31use orbok_db::Catalog;
32use orbok_core::{JobStatus, JobType};
33use orbok_db::repo::IndexJobRepository;
34use tracing::warn;
35
36pub fn run_pending(
39 catalog: &Catalog,
40 extract_worker: &ExtractionWorker<'_>,
41 chunk_worker: &ChunkAndIndexWorker<'_>,
42 embed_worker: Option<&EmbeddingWorker<'_>>,
43 limit: u32,
44) -> OrbokResult<u64> {
45 let jobs = IndexJobRepository::new(catalog);
46 let mut succeeded = 0u64;
47 let mut processed = 0u32;
48
49 while processed < limit {
50 let batch = jobs.list_queued(1)?;
51 if batch.is_empty() {
52 break;
53 }
54 let job = &batch[0];
55 jobs.set_status(&job.job_id, JobStatus::Running)?;
56 let result = match job.job_type {
57 JobType::Extract => {
58 if let Some(file_id) = &job.file_id {
59 extract_worker.run(file_id)
60 } else {
61 Ok(())
62 }
63 }
64 JobType::Chunk | JobType::KeywordIndex => {
65 if let Some(file_id) = &job.file_id {
66 chunk_worker.run(file_id)
67 } else {
68 Ok(())
69 }
70 }
71 JobType::Embedding => {
72 if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
73 worker.run(file_id)
74 } else {
75 Ok(())
76 }
77 }
78 _ => Ok(()), };
80 match result {
81 Ok(()) => {
82 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
83 succeeded += 1;
84 }
85 Err(e) => {
86 warn!(job = job.job_id.as_str(), error = %e, "job failed");
87 jobs.set_status(&job.job_id, JobStatus::Failed)?;
88 }
89 }
90 processed += 1;
91 }
92 Ok(succeeded)
93}