orbok_workers/scheduler/
dispatch.rs1use super::job::{IndexJob, JobState, ResourceMode, SchedulerEvent};
19use super::limits::{MAX_JOB_ATTEMPTS, SchedulerConfig};
20use super::queue::QueueSet;
21use orbok_core::{JobId, JobStatus, OrbokResult, SourceId, now_iso8601};
22use orbok_db::Catalog;
23use orbok_db::repo::IndexJobRepository;
24
25pub struct Scheduler {
27 #[allow(dead_code)] config: SchedulerConfig,
29 queues: QueueSet,
30 resource_mode: ResourceMode,
31 events: Vec<SchedulerEvent>,
33 completed_count: u64,
35 failed_count: u64,
37}
38
39impl Scheduler {
40 pub fn new(config: SchedulerConfig) -> Self {
41 Self {
42 queues: QueueSet::new(&config.capacity),
43 config,
44 resource_mode: ResourceMode::default(),
45 events: Vec::new(),
46 completed_count: 0,
47 failed_count: 0,
48 }
49 }
50
51 pub fn with_defaults() -> Self {
52 Self::new(SchedulerConfig::default())
53 }
54
55 pub fn notify_user_active(&mut self) {
60 if self.resource_mode != ResourceMode::Paused {
61 let changed = self.resource_mode != ResourceMode::UserActive;
62 self.resource_mode = ResourceMode::UserActive;
63 if changed {
64 self.emit(SchedulerEvent::UserActivityDetected);
65 self.emit(SchedulerEvent::ResourceModeChanged(
66 ResourceMode::UserActive,
67 ));
68 }
69 }
70 }
71
72 pub fn notify_user_idle(&mut self) {
74 if self.resource_mode == ResourceMode::UserActive {
75 self.resource_mode = ResourceMode::Normal;
76 self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Normal));
77 }
78 }
79
80 pub fn resource_mode(&self) -> ResourceMode {
81 self.resource_mode
82 }
83
84 pub fn pause(&mut self, catalog: &Catalog) -> OrbokResult<()> {
89 if self.resource_mode == ResourceMode::Paused {
90 return Ok(());
91 }
92 self.resource_mode = ResourceMode::Paused;
93 self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Paused));
94 let conn = catalog.lock();
96 conn.execute(
97 "UPDATE index_jobs SET status = 'paused', updated_at = ?1 WHERE status = 'queued'",
98 rusqlite::params![now_iso8601()],
99 )
100 .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
101 Ok(())
102 }
103
104 pub fn resume(&mut self, catalog: &Catalog) -> OrbokResult<()> {
106 if self.resource_mode != ResourceMode::Paused {
107 return Ok(());
108 }
109 self.resource_mode = ResourceMode::Normal;
110 self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Normal));
111 let conn = catalog.lock();
113 conn.execute(
114 "UPDATE index_jobs SET status = 'queued', updated_at = ?1 WHERE status = 'paused'",
115 rusqlite::params![now_iso8601()],
116 )
117 .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
118 Ok(())
119 }
120
121 pub fn cancel_source(&mut self, source_id: &SourceId, catalog: &Catalog) -> OrbokResult<usize> {
124 let cancelled = self.queues.cancel_source(source_id);
125 let conn = catalog.lock();
127 conn.execute(
128 "UPDATE index_jobs SET status = 'canceled', updated_at = ?1 \
129 WHERE source_id = ?2 AND status IN ('queued','paused')",
130 rusqlite::params![now_iso8601(), source_id.as_str()],
131 )
132 .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
133 Ok(cancelled)
134 }
135
136 pub fn enqueue(&mut self, job: IndexJob, catalog: &Catalog) -> OrbokResult<()> {
144 let queue = self.queues.queue_for(job.kind);
145 if queue.is_full() {
146 let kind = queue.kind();
147 if !queue.backpressure_active {
148 queue.backpressure_active = true;
149 self.events
150 .push(SchedulerEvent::QueueBackpressureApplied(kind));
151 }
152 return Err(orbok_core::OrbokError::BackpressureActive {
153 queue: format!("{kind:?}"),
154 });
155 }
156 if queue.backpressure_active && !queue.is_full() {
158 queue.backpressure_active = false;
159 let kind = queue.kind();
160 self.events
161 .push(SchedulerEvent::QueueBackpressureReleased(kind));
162 }
163 let jobs = IndexJobRepository::new(catalog);
165 jobs.enqueue_with_priority(
166 job.kind.as_job_type(),
167 Some(&job.source_id),
168 job.file_id.as_ref(),
169 job.priority.as_i64(),
170 )?;
171 let id = job.id.clone();
172 self.queues.queue_for(job.kind).push(job);
173 self.emit(SchedulerEvent::JobQueued(id));
174 Ok(())
175 }
176
177 pub fn tick(&mut self) -> Option<IndexJob> {
185 if self.resource_mode == ResourceMode::Paused {
186 return None;
187 }
188 let job = self.queues.pop_next(self.resource_mode)?;
189
190 let queue = self.queues.queue_for(job.kind);
192 if queue.backpressure_active && !queue.is_full() {
193 queue.backpressure_active = false;
194 let kind = queue.kind();
195 self.events
196 .push(SchedulerEvent::QueueBackpressureReleased(kind));
197 }
198
199 self.emit(SchedulerEvent::JobStarted(job.id.clone()));
200 Some(job)
201 }
202
203 pub fn complete(&mut self, job_id: &JobId, catalog: &Catalog) -> OrbokResult<()> {
205 let jobs = IndexJobRepository::new(catalog);
206 jobs.set_status(job_id, JobStatus::Succeeded)?;
207 self.completed_count += 1;
208 self.emit(SchedulerEvent::JobCompleted(job_id.clone()));
209 self.emit_readiness(catalog);
210 Ok(())
211 }
212
213 pub fn fail(
215 &mut self,
216 mut job: IndexJob,
217 error_kind: &str,
218 catalog: &Catalog,
219 ) -> OrbokResult<()> {
220 job.attempt_count += 1;
221 job.last_error_kind = Some(error_kind.to_string());
222 if job.attempt_count < MAX_JOB_ATTEMPTS {
223 tracing::debug!(
225 job = job.id.as_str(),
226 attempt = job.attempt_count,
227 error = error_kind,
228 "job failed — will retry"
229 );
230 job.state = JobState::Pending;
231 let id = job.id.clone();
232 let queue = self.queues.queue_for(job.kind);
233 if !queue.is_full() {
234 queue.push(job);
235 }
236 let jobs = IndexJobRepository::new(catalog);
237 jobs.set_status(&id, JobStatus::Queued)?;
238 jobs.increment_attempt(&id, error_kind)?;
239 } else {
240 tracing::warn!(
241 job = job.id.as_str(),
242 attempts = job.attempt_count,
243 error = error_kind,
244 "job permanently failed after max attempts"
245 );
246 let jobs = IndexJobRepository::new(catalog);
247 jobs.set_status(&job.id, JobStatus::Failed)?;
248 self.failed_count += 1;
249 self.emit(SchedulerEvent::JobFailed {
250 id: job.id.clone(),
251 error_kind: error_kind.to_string(),
252 });
253 }
254 Ok(())
255 }
256
257 pub fn pending_count(&self) -> usize {
260 self.queues.total_pending()
261 }
262
263 pub fn completed_count(&self) -> u64 {
264 self.completed_count
265 }
266
267 pub fn failed_count(&self) -> u64 {
268 self.failed_count
269 }
270
271 pub fn is_idle(&self) -> bool {
272 self.queues.total_pending() == 0
273 }
274
275 pub fn drain_events(&mut self) -> Vec<SchedulerEvent> {
280 std::mem::take(&mut self.events)
281 }
282
283 fn emit(&mut self, event: SchedulerEvent) {
284 self.events.push(event);
285 }
286
287 fn emit_readiness(&mut self, catalog: &Catalog) {
288 let conn = catalog.lock();
290 let ready: i64 = conn
291 .query_row(
292 "SELECT COUNT(*) FROM files WHERE file_status = 'indexed'",
293 [],
294 |r| r.get(0),
295 )
296 .unwrap_or(0);
297 let pending = self.pending_count() as u64;
298 self.events.push(SchedulerEvent::PartialReadinessChanged {
299 ready_count: ready as u64,
300 pending_count: pending,
301 });
302 }
303}