Skip to main content

qml_rs/processing/
scheduler.rs

1//! Job scheduler for delayed and recurring jobs
2//!
3//! This module contains the JobScheduler that handles scheduling jobs for
4//! future execution and managing recurring job patterns.
5
6use chrono::{DateTime, Duration, Utc};
7use std::future::Future;
8use std::sync::Arc;
9use tokio::time::interval;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info};
12
13use crate::core::{Job, JobState};
14use crate::error::{QmlError, Result};
15use crate::storage::prelude::*;
16use crate::storage::{Storage, StorageError};
17
18/// Maximum number of due jobs to drain per scheduler tick. Bounds the amount
19/// of work a single tick can enqueue if a large backlog has accumulated.
20const DEFAULT_SCHEDULER_BATCH_SIZE: usize = 1000;
21
22/// Job scheduler for managing delayed and recurring jobs
23pub struct JobScheduler {
24    storage: Arc<dyn Storage>,
25    poll_interval: Duration,
26    batch_size: usize,
27}
28
29impl JobScheduler {
30    /// Create a new job scheduler
31    pub fn new(storage: Arc<dyn Storage>) -> Self {
32        Self {
33            storage,
34            poll_interval: Duration::seconds(30), // Check every 30 seconds by default
35            batch_size: DEFAULT_SCHEDULER_BATCH_SIZE,
36        }
37    }
38
39    /// Create a new job scheduler with custom poll interval
40    pub fn with_poll_interval(storage: Arc<dyn Storage>, poll_interval: Duration) -> Self {
41        Self {
42            storage,
43            poll_interval,
44            batch_size: DEFAULT_SCHEDULER_BATCH_SIZE,
45        }
46    }
47
48    /// Start the scheduler loop. Runs forever; use
49    /// [`JobScheduler::run_until_cancelled`] when you need to observe a
50    /// shutdown signal.
51    pub async fn run(&self) -> Result<()> {
52        self.run_until_cancelled(CancellationToken::new()).await
53    }
54
55    /// Start the scheduler loop, exiting cleanly when `cancel` is cancelled.
56    pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
57        info!(
58            "Starting job scheduler with poll interval: {:?}",
59            self.poll_interval
60        );
61
62        let mut interval =
63            interval(
64                self.poll_interval
65                    .to_std()
66                    .map_err(|e| QmlError::ConfigurationError {
67                        message: format!("Invalid poll interval: {}", e),
68                    })?,
69            );
70
71        loop {
72            tokio::select! {
73                biased;
74                _ = cancel.cancelled() => {
75                    debug!("Scheduler loop exiting on cancellation");
76                    return Ok(());
77                }
78                _ = interval.tick() => {}
79            }
80
81            if let Err(e) = self
82                .process_due_jobs("scheduled", |storage, now, limit| async move {
83                    storage.claim_due_scheduled_jobs(now, limit).await
84                })
85                .await
86            {
87                error!("Error processing scheduled jobs: {}", e);
88            }
89
90            if let Err(e) = self
91                .process_due_jobs("retry", |storage, now, limit| async move {
92                    storage.claim_due_retry_jobs(now, limit).await
93                })
94                .await
95            {
96                error!("Error processing retry jobs: {}", e);
97            }
98        }
99    }
100
101    /// Drain a single category of due jobs into the `Enqueued` state.
102    ///
103    /// `claim` is a backend-specific atomic primitive — it both selects due
104    /// jobs and transitions them to `Enqueued` in one operation, so two
105    /// schedulers running against the same backend can't both promote the
106    /// same job. The scheduler used to do the transition itself with a
107    /// follow-up `update`; that pattern was racy under multi-server.
108    async fn process_due_jobs<F, Fut>(&self, kind: &str, claim: F) -> Result<()>
109    where
110        F: FnOnce(Arc<dyn Storage>, DateTime<Utc>, usize) -> Fut,
111        Fut: Future<Output = std::result::Result<Vec<Job>, StorageError>>,
112    {
113        debug!("Checking for {} jobs ready for execution", kind);
114
115        let now = Utc::now();
116        let claimed_jobs = claim(self.storage.clone(), now, self.batch_size)
117            .await
118            .map_err(|e| QmlError::StorageError {
119                message: format!("Failed to claim due {} jobs: {}", kind, e),
120            })?;
121
122        if !claimed_jobs.is_empty() {
123            info!(
124                "Promoted {} {} job(s) to Enqueued",
125                claimed_jobs.len(),
126                kind
127            );
128        }
129        Ok(())
130    }
131
132    /// Schedule a job for future execution
133    pub async fn schedule_job(
134        &self,
135        mut job: Job,
136        execute_at: DateTime<Utc>,
137        reason: impl Into<String>,
138    ) -> Result<()> {
139        let scheduled_state = JobState::scheduled(execute_at, reason);
140
141        job.set_state(scheduled_state)?;
142
143        self.storage
144            .enqueue(&job)
145            .await
146            .map_err(|e| QmlError::StorageError {
147                message: format!("Failed to schedule job: {}", e),
148            })?;
149
150        info!("Scheduled job {} for execution at {}", job.id, execute_at);
151        Ok(())
152    }
153
154    /// Schedule a job with a delay from now
155    pub async fn schedule_job_in(
156        &self,
157        job: Job,
158        delay: Duration,
159        reason: impl Into<String>,
160    ) -> Result<()> {
161        let execute_at = Utc::now() + delay;
162        self.schedule_job(job, execute_at, reason).await
163    }
164
165    /// Get the current poll interval
166    pub fn poll_interval(&self) -> Duration {
167        self.poll_interval
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::storage::{MemoryStorage, MonitoringApi};
175
176    #[tokio::test]
177    async fn test_schedule_job() {
178        let storage = Arc::new(MemoryStorage::new());
179        let scheduler = JobScheduler::new(storage.clone());
180
181        let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
182        let job_id = job.id.clone();
183        let execute_at = Utc::now() + Duration::seconds(1);
184
185        scheduler
186            .schedule_job(job, execute_at, "test")
187            .await
188            .unwrap();
189
190        // Check that the job is scheduled
191        let stored_job = storage.get(&job_id).await.unwrap().unwrap();
192        assert!(matches!(stored_job.state, JobState::Scheduled { .. }));
193    }
194
195    #[tokio::test]
196    async fn test_process_scheduled_jobs() {
197        let storage = Arc::new(MemoryStorage::new());
198        let scheduler = JobScheduler::new(storage.clone());
199
200        // Create a job scheduled for immediate execution
201        let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
202        let job_id = job.id.clone();
203        let execute_at = Utc::now() - Duration::seconds(1); // Past time
204
205        scheduler
206            .schedule_job(job, execute_at, "test")
207            .await
208            .unwrap();
209
210        // Process scheduled jobs (claim_due_scheduled_jobs does the
211        // Scheduled → Enqueued transition atomically inside storage).
212        scheduler
213            .process_due_jobs("scheduled", |storage, now, limit| async move {
214                storage.claim_due_scheduled_jobs(now, limit).await
215            })
216            .await
217            .unwrap();
218
219        // Check that the job is now enqueued
220        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
221        assert!(matches!(updated_job.state, JobState::Enqueued { .. }));
222    }
223
224    #[tokio::test]
225    async fn fetch_due_scheduled_jobs_bounds_to_limit_and_past_due() {
226        // Regression test for B1: scheduler must not drag every scheduled job
227        // into memory when only a handful are due.
228        let storage = Arc::new(MemoryStorage::new());
229
230        // 1000 jobs scheduled far in the future.
231        for _ in 0..1000 {
232            let mut job = Job::new("noop", serde_json::Value::Null);
233            job.set_state(JobState::scheduled(
234                Utc::now() + Duration::hours(1),
235                "future",
236            ))
237            .unwrap();
238            storage.enqueue(&job).await.unwrap();
239        }
240
241        // 10 jobs already past due.
242        let mut due_ids = Vec::with_capacity(10);
243        for _ in 0..10 {
244            let mut job = Job::new("noop", serde_json::Value::Null);
245            job.set_state(JobState::scheduled(
246                Utc::now() - Duration::seconds(5),
247                "past",
248            ))
249            .unwrap();
250            due_ids.push(job.id.clone());
251            storage.enqueue(&job).await.unwrap();
252        }
253
254        let due = storage
255            .fetch_due_scheduled_jobs(Utc::now(), 100)
256            .await
257            .unwrap();
258
259        assert_eq!(
260            due.len(),
261            10,
262            "storage should only return the 10 past-due jobs"
263        );
264        for job in &due {
265            assert!(due_ids.contains(&job.id));
266        }
267
268        // Running the scheduler must transition exactly those 10 jobs.
269        let scheduler = JobScheduler::new(storage.clone());
270        scheduler
271            .process_due_jobs("scheduled", |storage, now, limit| async move {
272                storage.claim_due_scheduled_jobs(now, limit).await
273            })
274            .await
275            .unwrap();
276
277        for id in &due_ids {
278            let job = storage.get(id).await.unwrap().unwrap();
279            assert!(
280                matches!(job.state, JobState::Enqueued { .. }),
281                "job {} should have moved to Enqueued",
282                id
283            );
284        }
285    }
286
287    #[tokio::test]
288    async fn fetch_due_retry_jobs_filters_future_retries() {
289        let storage = Arc::new(MemoryStorage::new());
290
291        // AwaitingRetry is only reachable via Processing → AwaitingRetry, so
292        // bypass state validation by assigning the state field directly for
293        // this fixture.
294        let mut future_retry = Job::new("noop", serde_json::Value::Null);
295        future_retry.state = JobState::awaiting_retry(Utc::now() + Duration::minutes(10), "later");
296        storage.enqueue(&future_retry).await.unwrap();
297
298        let mut due_retry = Job::new("noop", serde_json::Value::Null);
299        due_retry.state = JobState::awaiting_retry(Utc::now() - Duration::seconds(1), "now");
300        let due_id = due_retry.id.clone();
301        storage.enqueue(&due_retry).await.unwrap();
302
303        let due = storage.fetch_due_retry_jobs(Utc::now(), 100).await.unwrap();
304        assert_eq!(due.len(), 1);
305        assert_eq!(due[0].id, due_id);
306    }
307
308    #[tokio::test]
309    async fn claim_due_scheduled_jobs_returns_enqueued_state_atomically() {
310        // Regression for I9: claim_due_scheduled_jobs is the contract that
311        // multi-server schedulers rely on to avoid double-promotion. Verify
312        // it transitions Scheduled → Enqueued in a single call (no follow-up
313        // update needed) and that a second call observes none of the
314        // already-claimed jobs.
315        let storage = Arc::new(MemoryStorage::new());
316
317        let mut due_ids = Vec::new();
318        for _ in 0..5 {
319            let mut job = Job::new("noop", serde_json::Value::Null);
320            job.set_state(JobState::scheduled(
321                Utc::now() - Duration::seconds(5),
322                "past",
323            ))
324            .unwrap();
325            due_ids.push(job.id.clone());
326            storage.enqueue(&job).await.unwrap();
327        }
328
329        let claimed = storage
330            .claim_due_scheduled_jobs(Utc::now(), 100)
331            .await
332            .unwrap();
333        assert_eq!(claimed.len(), 5, "all 5 due jobs must be claimed");
334        for job in &claimed {
335            assert!(
336                matches!(job.state, JobState::Enqueued { .. }),
337                "claim returns jobs already in Enqueued state, got {:?}",
338                job.state
339            );
340        }
341
342        // Second call must return zero — the storage already transitioned
343        // those jobs out of Scheduled. This is what protects multi-server
344        // deployments from double-enqueue.
345        let again = storage
346            .claim_due_scheduled_jobs(Utc::now(), 100)
347            .await
348            .unwrap();
349        assert!(
350            again.is_empty(),
351            "second claim must not re-pick already-promoted jobs"
352        );
353
354        // And the underlying rows are observably Enqueued.
355        for id in &due_ids {
356            let job = storage.get(id).await.unwrap().unwrap();
357            assert!(matches!(job.state, JobState::Enqueued { .. }));
358        }
359    }
360
361    #[tokio::test]
362    async fn test_schedule_job_in() {
363        let storage = Arc::new(MemoryStorage::new());
364        let scheduler = JobScheduler::new(storage.clone());
365
366        let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
367        let job_id = job.id.clone();
368
369        scheduler
370            .schedule_job_in(job, Duration::minutes(5), "delayed")
371            .await
372            .unwrap();
373
374        // Check that the job is scheduled
375        let stored_job = storage.get(&job_id).await.unwrap().unwrap();
376        if let JobState::Scheduled { enqueue_at, .. } = stored_job.state {
377            assert!(enqueue_at > Utc::now());
378        } else {
379            panic!("Job should be in Scheduled state");
380        }
381    }
382}