1mod chunk_adapter;
22mod chunk_and_index;
23pub mod cleanup_service;
24mod embedding;
25mod extract;
26pub mod model_verifier;
27pub mod recovery;
28pub mod scheduler;
29pub mod storage;
30
31#[cfg(test)]
32mod tests;
33
34pub use chunk_and_index::ChunkAndIndexWorker;
35pub use cleanup_service::{CleanupService, FullCleanupOutcome};
36pub use embedding::EmbeddingWorker;
37pub use extract::ExtractionWorker;
38pub use model_verifier::{
39 FileIssue, FileIssueKind, VerifyOutcome, verify_embedding_model, verify_outcome_summary,
40};
41pub use recovery::{
42 IntegrityReport, RecoveryReport, check_catalog_integrity, run_startup_recovery,
43};
44pub use scheduler::{
45 IndexJob, JobKind, JobState, QueueCapacity, QueueKind, ResourceMode, Scheduler,
46 SchedulerConfig, SchedulerEvent, SchedulerLimits, WorkPriority,
47};
48pub use storage::update_storage_accounting;
49
50use orbok_core::OrbokResult;
51use orbok_core::{JobStatus, JobType};
52use orbok_db::Catalog;
53use orbok_db::repo::IndexJobRepository;
54use tracing::warn;
55
56pub fn run_pending(
63 catalog: &Catalog,
64 extract_worker: &ExtractionWorker<'_>,
65 chunk_worker: &ChunkAndIndexWorker<'_>,
66 embed_worker: Option<&EmbeddingWorker<'_>>,
67 limit: u32,
68) -> OrbokResult<u64> {
69 let jobs = IndexJobRepository::new(catalog);
70 let mut succeeded = 0u64;
71 let mut processed = 0u32;
72
73 while processed < limit {
74 let batch = jobs.list_queued(1)?;
75 if batch.is_empty() {
76 break;
77 }
78 let job = &batch[0];
79 jobs.set_status(&job.job_id, JobStatus::Running)?;
80 let result = match job.job_type {
81 JobType::Extract => {
82 if let Some(file_id) = &job.file_id {
83 extract_worker.run(file_id)
84 } else {
85 Ok(())
86 }
87 }
88 JobType::Chunk | JobType::KeywordIndex => {
89 if let Some(file_id) = &job.file_id {
90 chunk_worker.run(file_id)
91 } else {
92 Ok(())
93 }
94 }
95 JobType::Embedding => {
96 if let (Some(file_id), Some(worker)) = (&job.file_id, embed_worker) {
97 worker.run(file_id)
98 } else {
99 Ok(())
100 }
101 }
102 _ => Ok(()), };
104 match result {
105 Ok(()) => {
106 jobs.set_status(&job.job_id, JobStatus::Succeeded)?;
107 succeeded += 1;
108 }
109 Err(e) => {
110 warn!(job = job.job_id.as_str(), error = %e, "job failed");
111 jobs.set_status(&job.job_id, JobStatus::Failed)?;
112 }
113 }
114 processed += 1;
115 }
116 Ok(succeeded)
117}