qml_rs/processing/
scheduler.rs1use chrono::{DateTime, Duration, Utc};
7use std::sync::Arc;
8use tokio::time::interval;
9use tracing::{debug, error, info};
10
11use crate::core::{Job, JobState};
12use crate::error::{QmlError, Result};
13use crate::storage::Storage;
14
15pub struct JobScheduler {
17 storage: Arc<dyn Storage>,
18 poll_interval: Duration,
19}
20
21impl JobScheduler {
22 pub fn new(storage: Arc<dyn Storage>) -> Self {
24 Self {
25 storage,
26 poll_interval: Duration::seconds(30), }
28 }
29
30 pub fn with_poll_interval(storage: Arc<dyn Storage>, poll_interval: Duration) -> Self {
32 Self {
33 storage,
34 poll_interval,
35 }
36 }
37
38 pub async fn run(&self) -> Result<()> {
40 info!(
41 "Starting job scheduler with poll interval: {:?}",
42 self.poll_interval
43 );
44
45 let mut interval =
46 interval(
47 self.poll_interval
48 .to_std()
49 .map_err(|e| QmlError::ConfigurationError {
50 message: format!("Invalid poll interval: {}", e),
51 })?,
52 );
53
54 loop {
55 interval.tick().await;
56
57 if let Err(e) = self.process_scheduled_jobs().await {
58 error!("Error processing scheduled jobs: {}", e);
59 }
60
61 if let Err(e) = self.process_retry_jobs().await {
62 error!("Error processing retry jobs: {}", e);
63 }
64 }
65 }
66
67 async fn process_scheduled_jobs(&self) -> Result<()> {
69 debug!("Checking for scheduled jobs ready for execution");
70
71 let now = Utc::now();
72
73 let scheduled_state = JobState::scheduled(now, "check");
75 let jobs = self
76 .storage
77 .list(Some(&scheduled_state), None, None)
78 .await
79 .map_err(|e| QmlError::StorageError {
80 message: format!("Failed to list scheduled jobs: {}", e),
81 })?;
82
83 let mut ready_jobs = Vec::new();
84
85 for job in jobs {
86 if let JobState::Scheduled { enqueue_at, .. } = &job.state {
87 if *enqueue_at <= now {
88 ready_jobs.push(job);
89 }
90 }
91 }
92
93 debug!(
94 "Found {} scheduled jobs ready for execution",
95 ready_jobs.len()
96 );
97
98 for mut job in ready_jobs {
100 info!("Enqueueing scheduled job: {}", job.id);
101
102 let enqueued_state = JobState::enqueued(&job.queue);
103 if let Err(e) = job.set_state(enqueued_state) {
104 error!("Failed to set job state to Enqueued: {}", e);
105 continue;
106 }
107
108 if let Err(e) = self.storage.update(&job).await {
109 error!("Failed to update job in storage: {}", e);
110 }
111 }
112
113 Ok(())
114 }
115
116 async fn process_retry_jobs(&self) -> Result<()> {
118 debug!("Checking for jobs ready for retry");
119
120 let now = Utc::now();
121
122 let retry_state = JobState::awaiting_retry(now, 1, "check");
124 let jobs = self
125 .storage
126 .list(Some(&retry_state), None, None)
127 .await
128 .map_err(|e| QmlError::StorageError {
129 message: format!("Failed to list retry jobs: {}", e),
130 })?;
131
132 let mut ready_jobs = Vec::new();
133
134 for job in jobs {
135 if let JobState::AwaitingRetry { retry_at, .. } = &job.state {
136 if *retry_at <= now {
137 ready_jobs.push(job);
138 }
139 }
140 }
141
142 debug!("Found {} retry jobs ready for execution", ready_jobs.len());
143
144 for mut job in ready_jobs {
146 info!("Enqueueing retry job: {}", job.id);
147
148 let enqueued_state = JobState::enqueued(&job.queue);
149 if let Err(e) = job.set_state(enqueued_state) {
150 error!("Failed to set job state to Enqueued: {}", e);
151 continue;
152 }
153
154 if let Err(e) = self.storage.update(&job).await {
155 error!("Failed to update job in storage: {}", e);
156 }
157 }
158
159 Ok(())
160 }
161
162 pub async fn schedule_job(
164 &self,
165 mut job: Job,
166 execute_at: DateTime<Utc>,
167 reason: impl Into<String>,
168 ) -> Result<()> {
169 let scheduled_state = JobState::scheduled(execute_at, reason);
170
171 job.set_state(scheduled_state)?;
172
173 self.storage
174 .enqueue(&job)
175 .await
176 .map_err(|e| QmlError::StorageError {
177 message: format!("Failed to schedule job: {}", e),
178 })?;
179
180 info!("Scheduled job {} for execution at {}", job.id, execute_at);
181 Ok(())
182 }
183
184 pub async fn schedule_job_in(
186 &self,
187 job: Job,
188 delay: Duration,
189 reason: impl Into<String>,
190 ) -> Result<()> {
191 let execute_at = Utc::now() + delay;
192 self.schedule_job(job, execute_at, reason).await
193 }
194
195 pub fn poll_interval(&self) -> Duration {
197 self.poll_interval
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::storage::MemoryStorage;
205
206 #[tokio::test]
207 async fn test_schedule_job() {
208 let storage = Arc::new(MemoryStorage::new());
209 let scheduler = JobScheduler::new(storage.clone());
210
211 let job = Job::new("test_method", vec!["arg1".to_string()]);
212 let job_id = job.id.clone();
213 let execute_at = Utc::now() + Duration::seconds(1);
214
215 scheduler
216 .schedule_job(job, execute_at, "test")
217 .await
218 .unwrap();
219
220 let stored_job = storage.get(&job_id).await.unwrap().unwrap();
222 assert!(matches!(stored_job.state, JobState::Scheduled { .. }));
223 }
224
225 #[tokio::test]
226 async fn test_process_scheduled_jobs() {
227 let storage = Arc::new(MemoryStorage::new());
228 let scheduler = JobScheduler::new(storage.clone());
229
230 let job = Job::new("test_method", vec!["arg1".to_string()]);
232 let job_id = job.id.clone();
233 let execute_at = Utc::now() - Duration::seconds(1); scheduler
236 .schedule_job(job, execute_at, "test")
237 .await
238 .unwrap();
239
240 scheduler.process_scheduled_jobs().await.unwrap();
242
243 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
245 assert!(matches!(updated_job.state, JobState::Enqueued { .. }));
246 }
247
248 #[tokio::test]
249 async fn test_schedule_job_in() {
250 let storage = Arc::new(MemoryStorage::new());
251 let scheduler = JobScheduler::new(storage.clone());
252
253 let job = Job::new("test_method", vec!["arg1".to_string()]);
254 let job_id = job.id.clone();
255
256 scheduler
257 .schedule_job_in(job, Duration::minutes(5), "delayed")
258 .await
259 .unwrap();
260
261 let stored_job = storage.get(&job_id).await.unwrap().unwrap();
263 if let JobState::Scheduled { enqueue_at, .. } = stored_job.state {
264 assert!(enqueue_at > Utc::now());
265 } else {
266 panic!("Job should be in Scheduled state");
267 }
268 }
269}