mod chunk_and_index;
mod embedding;
mod extract;
pub mod storage;
#[cfg(test)]
mod tests;
pub use chunk_and_index::ChunkAndIndexWorker;
pub use embedding::EmbeddingWorker;
pub use extract::ExtractionWorker;
pub use storage::update_storage_accounting;
use orbok_core::OrbokResult;
use orbok_db::Catalog;
use orbok_core::{JobStatus, JobType};
use orbok_db::repo::IndexJobRepository;
use tracing::warn;
pub fn run_pending(
catalog: &Catalog,
extract_worker: &ExtractionWorker<'_>,
chunk_worker: &ChunkAndIndexWorker<'_>,
embed_worker: Option<&EmbeddingWorker<'_>>,
limit: u32,
) -> OrbokResult<u64> {
let jobs = IndexJobRepository::new(catalog);
let mut succeeded = 0u64;
let mut processed = 0u32;
while processed < limit {
let batch = jobs.list_queued(1)?;
if batch.is_empty() {
break;
}
let job = &batch[0];
jobs.set_status(&job.job_id, JobStatus::Running)?;
let result = match job.job_type {
JobType::Extract => {
if let Some(file_id) = &job.file_id {
extract_worker.run(file_id)
} else {
Ok(())
}
}
JobType::Chunk | JobType::KeywordIndex => {
if let Some(file_id) = &job.file_id {
chunk_worker.run(file_id)
} else {
Ok(())
}
}
JobType::Embedding => {
if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
worker.run(file_id)
} else {
Ok(())
}
}
_ => Ok(()), };
match result {
Ok(()) => {
jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
succeeded += 1;
}
Err(e) => {
warn!(job = job.job_id.as_str(), error = %e, "job failed");
jobs.set_status(&job.job_id, JobStatus::Failed)?;
}
}
processed += 1;
}
Ok(succeeded)
}