Skip to main content

orbok_workers/scheduler/
dispatch.rs

1//! Resource-aware indexing scheduler (RFC-036 §5–§16).
2//!
3//! The `Scheduler` is the single entry point for all background
4//! indexing work. It:
5//!
6//! - routes jobs into typed bounded queues;
7//! - enforces backpressure when queues are full;
8//! - tracks resource mode (`Normal` / `UserActive` / `Paused`);
9//! - dispatches one job per `tick()` call, skipping embedding in
10//!   `UserActive` mode (RFC-036 §9.2);
11//! - emits `SchedulerEvent`s the app layer uses to update the UI;
12//! - persists job state to the catalog for crash recovery (RFC-036 §16).
13//!
14//! Execution model: synchronous, single-threaded. This matches the
15//! existing `run_pending` model and the snora/iced GUI thread contract.
16//! Async dispatch can be layered on top in a future RFC.
17
18use super::job::{IndexJob, JobState, ResourceMode, SchedulerEvent};
19use super::limits::{MAX_JOB_ATTEMPTS, SchedulerConfig};
20use super::queue::QueueSet;
21use orbok_core::{JobId, JobStatus, OrbokResult, SourceId, now_iso8601};
22use orbok_db::Catalog;
23use orbok_db::repo::IndexJobRepository;
24
25/// The resource-aware scheduler (RFC-036 §5–§16).
26pub struct Scheduler {
27    #[allow(dead_code)] // reserved for future per-queue tuning
28    config: SchedulerConfig,
29    queues: QueueSet,
30    resource_mode: ResourceMode,
31    /// Events accumulated since the last `drain_events` call.
32    events: Vec<SchedulerEvent>,
33    /// Number of jobs that completed successfully in this session.
34    completed_count: u64,
35    /// Number of jobs that failed in this session.
36    failed_count: u64,
37}
38
39impl Scheduler {
40    pub fn new(config: SchedulerConfig) -> Self {
41        Self {
42            queues: QueueSet::new(&config.capacity),
43            config,
44            resource_mode: ResourceMode::default(),
45            events: Vec::new(),
46            completed_count: 0,
47            failed_count: 0,
48        }
49    }
50
51    pub fn with_defaults() -> Self {
52        Self::new(SchedulerConfig::default())
53    }
54
55    // ── Resource mode ───────────────────────────────────────────────────
56
57    /// Notify the scheduler that the user is actively searching or
58    /// typing. Background embedding will yield (RFC-036 §13.1).
59    pub fn notify_user_active(&mut self) {
60        if self.resource_mode != ResourceMode::Paused {
61            let changed = self.resource_mode != ResourceMode::UserActive;
62            self.resource_mode = ResourceMode::UserActive;
63            if changed {
64                self.emit(SchedulerEvent::UserActivityDetected);
65                self.emit(SchedulerEvent::ResourceModeChanged(
66                    ResourceMode::UserActive,
67                ));
68            }
69        }
70    }
71
72    /// Notify the scheduler that user activity has subsided.
73    pub fn notify_user_idle(&mut self) {
74        if self.resource_mode == ResourceMode::UserActive {
75            self.resource_mode = ResourceMode::Normal;
76            self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Normal));
77        }
78    }
79
80    pub fn resource_mode(&self) -> ResourceMode {
81        self.resource_mode
82    }
83
84    // ── Pause / Resume / Cancel ─────────────────────────────────────────
85
86    /// Pause all background work (RFC-036 §12.1–§12.2).
87    /// In-flight jobs finish their current small unit; no new jobs start.
88    pub fn pause(&mut self, catalog: &Catalog) -> OrbokResult<()> {
89        if self.resource_mode == ResourceMode::Paused {
90            return Ok(());
91        }
92        self.resource_mode = ResourceMode::Paused;
93        self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Paused));
94        // Persist paused state for recovery (RFC-036 §16).
95        let conn = catalog.lock();
96        conn.execute(
97            "UPDATE index_jobs SET status = 'paused', updated_at = ?1 WHERE status = 'queued'",
98            rusqlite::params![now_iso8601()],
99        )
100        .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
101        Ok(())
102    }
103
104    /// Resume background work after a pause (RFC-036 §12.2).
105    pub fn resume(&mut self, catalog: &Catalog) -> OrbokResult<()> {
106        if self.resource_mode != ResourceMode::Paused {
107            return Ok(());
108        }
109        self.resource_mode = ResourceMode::Normal;
110        self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Normal));
111        // Restore paused jobs to queued so they are picked up again.
112        let conn = catalog.lock();
113        conn.execute(
114            "UPDATE index_jobs SET status = 'queued', updated_at = ?1 WHERE status = 'paused'",
115            rusqlite::params![now_iso8601()],
116        )
117        .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
118        Ok(())
119    }
120
121    /// Cancel all queued work for a source (RFC-036 §12.3).
122    /// Called when the user removes a folder.
123    pub fn cancel_source(&mut self, source_id: &SourceId, catalog: &Catalog) -> OrbokResult<usize> {
124        let cancelled = self.queues.cancel_source(source_id);
125        // Persist cancellation to catalog.
126        let conn = catalog.lock();
127        conn.execute(
128            "UPDATE index_jobs SET status = 'canceled', updated_at = ?1 \
129             WHERE source_id = ?2 AND status IN ('queued','paused')",
130            rusqlite::params![now_iso8601(), source_id.as_str()],
131        )
132        .map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
133        Ok(cancelled)
134    }
135
136    // ── Enqueue ─────────────────────────────────────────────────────────
137
138    /// Submit a job to the appropriate bounded queue (RFC-036 §7).
139    ///
140    /// Returns `Err` with a `BackpressureActive` variant if the target
141    /// queue is full — the caller must wait and retry rather than
142    /// allocating unbounded memory (RFC-036 §10.2).
143    pub fn enqueue(&mut self, job: IndexJob, catalog: &Catalog) -> OrbokResult<()> {
144        let queue = self.queues.queue_for(job.kind);
145        if queue.is_full() {
146            let kind = queue.kind();
147            if !queue.backpressure_active {
148                queue.backpressure_active = true;
149                self.events
150                    .push(SchedulerEvent::QueueBackpressureApplied(kind));
151            }
152            return Err(orbok_core::OrbokError::BackpressureActive {
153                queue: format!("{kind:?}"),
154            });
155        }
156        // If backpressure was active and the queue now has room, release it.
157        if queue.backpressure_active && !queue.is_full() {
158            queue.backpressure_active = false;
159            let kind = queue.kind();
160            self.events
161                .push(SchedulerEvent::QueueBackpressureReleased(kind));
162        }
163        // Persist job to catalog for crash recovery.
164        let jobs = IndexJobRepository::new(catalog);
165        jobs.enqueue_with_priority(
166            job.kind.as_job_type(),
167            Some(&job.source_id),
168            job.file_id.as_ref(),
169            job.priority.as_i64(),
170        )?;
171        let id = job.id.clone();
172        self.queues.queue_for(job.kind).push(job);
173        self.emit(SchedulerEvent::JobQueued(id));
174        Ok(())
175    }
176
177    // ── Dispatch ─────────────────────────────────────────────────────────
178
179    /// Dispatch one job from the queues (RFC-036 §8, §9).
180    ///
181    /// Returns the job to run, or `None` when paused or all queues are
182    /// empty. The caller executes the job and reports back via
183    /// `complete` or `fail`.
184    pub fn tick(&mut self) -> Option<IndexJob> {
185        if self.resource_mode == ResourceMode::Paused {
186            return None;
187        }
188        let job = self.queues.pop_next(self.resource_mode)?;
189
190        // Release backpressure on the queue that just yielded a slot.
191        let queue = self.queues.queue_for(job.kind);
192        if queue.backpressure_active && !queue.is_full() {
193            queue.backpressure_active = false;
194            let kind = queue.kind();
195            self.events
196                .push(SchedulerEvent::QueueBackpressureReleased(kind));
197        }
198
199        self.emit(SchedulerEvent::JobStarted(job.id.clone()));
200        Some(job)
201    }
202
203    /// Mark a job as successfully completed (RFC-036 §11).
204    pub fn complete(&mut self, job_id: &JobId, catalog: &Catalog) -> OrbokResult<()> {
205        let jobs = IndexJobRepository::new(catalog);
206        jobs.set_status(job_id, JobStatus::Succeeded)?;
207        self.completed_count += 1;
208        self.emit(SchedulerEvent::JobCompleted(job_id.clone()));
209        self.emit_readiness(catalog);
210        Ok(())
211    }
212
213    /// Mark a job as failed; retry if under the attempt limit (RFC-036 §11).
214    pub fn fail(
215        &mut self,
216        mut job: IndexJob,
217        error_kind: &str,
218        catalog: &Catalog,
219    ) -> OrbokResult<()> {
220        job.attempt_count += 1;
221        job.last_error_kind = Some(error_kind.to_string());
222        if job.attempt_count < MAX_JOB_ATTEMPTS {
223            // Re-queue for retry (RFC-036 §17.1 retry limit test).
224            tracing::debug!(
225                job = job.id.as_str(),
226                attempt = job.attempt_count,
227                error = error_kind,
228                "job failed — will retry"
229            );
230            job.state = JobState::Pending;
231            let id = job.id.clone();
232            let queue = self.queues.queue_for(job.kind);
233            if !queue.is_full() {
234                queue.push(job);
235            }
236            let jobs = IndexJobRepository::new(catalog);
237            jobs.set_status(&id, JobStatus::Queued)?;
238            jobs.increment_attempt(&id, error_kind)?;
239        } else {
240            tracing::warn!(
241                job = job.id.as_str(),
242                attempts = job.attempt_count,
243                error = error_kind,
244                "job permanently failed after max attempts"
245            );
246            let jobs = IndexJobRepository::new(catalog);
247            jobs.set_status(&job.id, JobStatus::Failed)?;
248            self.failed_count += 1;
249            self.emit(SchedulerEvent::JobFailed {
250                id: job.id.clone(),
251                error_kind: error_kind.to_string(),
252            });
253        }
254        Ok(())
255    }
256
257    // ── Progress ─────────────────────────────────────────────────────────
258
259    pub fn pending_count(&self) -> usize {
260        self.queues.total_pending()
261    }
262
263    pub fn completed_count(&self) -> u64 {
264        self.completed_count
265    }
266
267    pub fn failed_count(&self) -> u64 {
268        self.failed_count
269    }
270
271    pub fn is_idle(&self) -> bool {
272        self.queues.total_pending() == 0
273    }
274
275    // ── Events ────────────────────────────────────────────────────────────
276
277    /// Take all accumulated events since the last call.
278    /// The UI layer calls this on each frame to update progress copy.
279    pub fn drain_events(&mut self) -> Vec<SchedulerEvent> {
280        std::mem::take(&mut self.events)
281    }
282
283    fn emit(&mut self, event: SchedulerEvent) {
284        self.events.push(event);
285    }
286
287    fn emit_readiness(&mut self, catalog: &Catalog) {
288        // Count indexed files for partial readiness notice (RFC-036 §14.2).
289        let conn = catalog.lock();
290        let ready: i64 = conn
291            .query_row(
292                "SELECT COUNT(*) FROM files WHERE file_status = 'indexed'",
293                [],
294                |r| r.get(0),
295            )
296            .unwrap_or(0);
297        let pending = self.pending_count() as u64;
298        self.events.push(SchedulerEvent::PartialReadinessChanged {
299            ready_count: ready as u64,
300            pending_count: pending,
301        });
302    }
303}