1use chrono::{DateTime, Duration, Utc};
7use std::future::Future;
8use std::sync::Arc;
9use tokio::time::interval;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info};
12
13use crate::core::{Job, JobState};
14use crate::error::{QmlError, Result};
15use crate::storage::prelude::*;
16use crate::storage::{Storage, StorageError};
17
18const DEFAULT_SCHEDULER_BATCH_SIZE: usize = 1000;
21
22pub struct JobScheduler {
24 storage: Arc<dyn Storage>,
25 poll_interval: Duration,
26 batch_size: usize,
27}
28
29impl JobScheduler {
30 pub fn new(storage: Arc<dyn Storage>) -> Self {
32 Self {
33 storage,
34 poll_interval: Duration::seconds(30), batch_size: DEFAULT_SCHEDULER_BATCH_SIZE,
36 }
37 }
38
39 pub fn with_poll_interval(storage: Arc<dyn Storage>, poll_interval: Duration) -> Self {
41 Self {
42 storage,
43 poll_interval,
44 batch_size: DEFAULT_SCHEDULER_BATCH_SIZE,
45 }
46 }
47
48 pub async fn run(&self) -> Result<()> {
52 self.run_until_cancelled(CancellationToken::new()).await
53 }
54
55 pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
57 info!(
58 "Starting job scheduler with poll interval: {:?}",
59 self.poll_interval
60 );
61
62 let mut interval =
63 interval(
64 self.poll_interval
65 .to_std()
66 .map_err(|e| QmlError::ConfigurationError {
67 message: format!("Invalid poll interval: {}", e),
68 })?,
69 );
70
71 loop {
72 tokio::select! {
73 biased;
74 _ = cancel.cancelled() => {
75 debug!("Scheduler loop exiting on cancellation");
76 return Ok(());
77 }
78 _ = interval.tick() => {}
79 }
80
81 if let Err(e) = self
82 .process_due_jobs("scheduled", |storage, now, limit| async move {
83 storage.claim_due_scheduled_jobs(now, limit).await
84 })
85 .await
86 {
87 error!("Error processing scheduled jobs: {}", e);
88 }
89
90 if let Err(e) = self
91 .process_due_jobs("retry", |storage, now, limit| async move {
92 storage.claim_due_retry_jobs(now, limit).await
93 })
94 .await
95 {
96 error!("Error processing retry jobs: {}", e);
97 }
98 }
99 }
100
101 async fn process_due_jobs<F, Fut>(&self, kind: &str, claim: F) -> Result<()>
109 where
110 F: FnOnce(Arc<dyn Storage>, DateTime<Utc>, usize) -> Fut,
111 Fut: Future<Output = std::result::Result<Vec<Job>, StorageError>>,
112 {
113 debug!("Checking for {} jobs ready for execution", kind);
114
115 let now = Utc::now();
116 let claimed_jobs = claim(self.storage.clone(), now, self.batch_size)
117 .await
118 .map_err(|e| QmlError::StorageError {
119 message: format!("Failed to claim due {} jobs: {}", kind, e),
120 })?;
121
122 if !claimed_jobs.is_empty() {
123 info!(
124 "Promoted {} {} job(s) to Enqueued",
125 claimed_jobs.len(),
126 kind
127 );
128 }
129 Ok(())
130 }
131
132 pub async fn schedule_job(
134 &self,
135 mut job: Job,
136 execute_at: DateTime<Utc>,
137 reason: impl Into<String>,
138 ) -> Result<()> {
139 let scheduled_state = JobState::scheduled(execute_at, reason);
140
141 job.set_state(scheduled_state)?;
142
143 self.storage
144 .enqueue(&job)
145 .await
146 .map_err(|e| QmlError::StorageError {
147 message: format!("Failed to schedule job: {}", e),
148 })?;
149
150 info!("Scheduled job {} for execution at {}", job.id, execute_at);
151 Ok(())
152 }
153
154 pub async fn schedule_job_in(
156 &self,
157 job: Job,
158 delay: Duration,
159 reason: impl Into<String>,
160 ) -> Result<()> {
161 let execute_at = Utc::now() + delay;
162 self.schedule_job(job, execute_at, reason).await
163 }
164
165 pub fn poll_interval(&self) -> Duration {
167 self.poll_interval
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::storage::{MemoryStorage, MonitoringApi};
175
176 #[tokio::test]
177 async fn test_schedule_job() {
178 let storage = Arc::new(MemoryStorage::new());
179 let scheduler = JobScheduler::new(storage.clone());
180
181 let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
182 let job_id = job.id.clone();
183 let execute_at = Utc::now() + Duration::seconds(1);
184
185 scheduler
186 .schedule_job(job, execute_at, "test")
187 .await
188 .unwrap();
189
190 let stored_job = storage.get(&job_id).await.unwrap().unwrap();
192 assert!(matches!(stored_job.state, JobState::Scheduled { .. }));
193 }
194
195 #[tokio::test]
196 async fn test_process_scheduled_jobs() {
197 let storage = Arc::new(MemoryStorage::new());
198 let scheduler = JobScheduler::new(storage.clone());
199
200 let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
202 let job_id = job.id.clone();
203 let execute_at = Utc::now() - Duration::seconds(1); scheduler
206 .schedule_job(job, execute_at, "test")
207 .await
208 .unwrap();
209
210 scheduler
213 .process_due_jobs("scheduled", |storage, now, limit| async move {
214 storage.claim_due_scheduled_jobs(now, limit).await
215 })
216 .await
217 .unwrap();
218
219 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
221 assert!(matches!(updated_job.state, JobState::Enqueued { .. }));
222 }
223
224 #[tokio::test]
225 async fn fetch_due_scheduled_jobs_bounds_to_limit_and_past_due() {
226 let storage = Arc::new(MemoryStorage::new());
229
230 for _ in 0..1000 {
232 let mut job = Job::new("noop", serde_json::Value::Null);
233 job.set_state(JobState::scheduled(
234 Utc::now() + Duration::hours(1),
235 "future",
236 ))
237 .unwrap();
238 storage.enqueue(&job).await.unwrap();
239 }
240
241 let mut due_ids = Vec::with_capacity(10);
243 for _ in 0..10 {
244 let mut job = Job::new("noop", serde_json::Value::Null);
245 job.set_state(JobState::scheduled(
246 Utc::now() - Duration::seconds(5),
247 "past",
248 ))
249 .unwrap();
250 due_ids.push(job.id.clone());
251 storage.enqueue(&job).await.unwrap();
252 }
253
254 let due = storage
255 .fetch_due_scheduled_jobs(Utc::now(), 100)
256 .await
257 .unwrap();
258
259 assert_eq!(
260 due.len(),
261 10,
262 "storage should only return the 10 past-due jobs"
263 );
264 for job in &due {
265 assert!(due_ids.contains(&job.id));
266 }
267
268 let scheduler = JobScheduler::new(storage.clone());
270 scheduler
271 .process_due_jobs("scheduled", |storage, now, limit| async move {
272 storage.claim_due_scheduled_jobs(now, limit).await
273 })
274 .await
275 .unwrap();
276
277 for id in &due_ids {
278 let job = storage.get(id).await.unwrap().unwrap();
279 assert!(
280 matches!(job.state, JobState::Enqueued { .. }),
281 "job {} should have moved to Enqueued",
282 id
283 );
284 }
285 }
286
287 #[tokio::test]
288 async fn fetch_due_retry_jobs_filters_future_retries() {
289 let storage = Arc::new(MemoryStorage::new());
290
291 let mut future_retry = Job::new("noop", serde_json::Value::Null);
295 future_retry.state = JobState::awaiting_retry(Utc::now() + Duration::minutes(10), "later");
296 storage.enqueue(&future_retry).await.unwrap();
297
298 let mut due_retry = Job::new("noop", serde_json::Value::Null);
299 due_retry.state = JobState::awaiting_retry(Utc::now() - Duration::seconds(1), "now");
300 let due_id = due_retry.id.clone();
301 storage.enqueue(&due_retry).await.unwrap();
302
303 let due = storage.fetch_due_retry_jobs(Utc::now(), 100).await.unwrap();
304 assert_eq!(due.len(), 1);
305 assert_eq!(due[0].id, due_id);
306 }
307
308 #[tokio::test]
309 async fn claim_due_scheduled_jobs_returns_enqueued_state_atomically() {
310 let storage = Arc::new(MemoryStorage::new());
316
317 let mut due_ids = Vec::new();
318 for _ in 0..5 {
319 let mut job = Job::new("noop", serde_json::Value::Null);
320 job.set_state(JobState::scheduled(
321 Utc::now() - Duration::seconds(5),
322 "past",
323 ))
324 .unwrap();
325 due_ids.push(job.id.clone());
326 storage.enqueue(&job).await.unwrap();
327 }
328
329 let claimed = storage
330 .claim_due_scheduled_jobs(Utc::now(), 100)
331 .await
332 .unwrap();
333 assert_eq!(claimed.len(), 5, "all 5 due jobs must be claimed");
334 for job in &claimed {
335 assert!(
336 matches!(job.state, JobState::Enqueued { .. }),
337 "claim returns jobs already in Enqueued state, got {:?}",
338 job.state
339 );
340 }
341
342 let again = storage
346 .claim_due_scheduled_jobs(Utc::now(), 100)
347 .await
348 .unwrap();
349 assert!(
350 again.is_empty(),
351 "second claim must not re-pick already-promoted jobs"
352 );
353
354 for id in &due_ids {
356 let job = storage.get(id).await.unwrap().unwrap();
357 assert!(matches!(job.state, JobState::Enqueued { .. }));
358 }
359 }
360
361 #[tokio::test]
362 async fn test_schedule_job_in() {
363 let storage = Arc::new(MemoryStorage::new());
364 let scheduler = JobScheduler::new(storage.clone());
365
366 let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
367 let job_id = job.id.clone();
368
369 scheduler
370 .schedule_job_in(job, Duration::minutes(5), "delayed")
371 .await
372 .unwrap();
373
374 let stored_job = storage.get(&job_id).await.unwrap().unwrap();
376 if let JobState::Scheduled { enqueue_at, .. } = stored_job.state {
377 assert!(enqueue_at > Utc::now());
378 } else {
379 panic!("Job should be in Scheduled state");
380 }
381 }
382}