qml_rs/processing/
scheduler.rs

1//! Job scheduler for delayed and recurring jobs
2//!
3//! This module contains the JobScheduler that handles scheduling jobs for
4//! future execution and managing recurring job patterns.
5
6use chrono::{DateTime, Duration, Utc};
7use std::sync::Arc;
8use tokio::time::interval;
9use tracing::{debug, error, info};
10
11use crate::core::{Job, JobState};
12use crate::error::{QmlError, Result};
13use crate::storage::Storage;
14
15/// Job scheduler for managing delayed and recurring jobs
16pub struct JobScheduler {
17    storage: Arc<dyn Storage>,
18    poll_interval: Duration,
19}
20
21impl JobScheduler {
22    /// Create a new job scheduler
23    pub fn new(storage: Arc<dyn Storage>) -> Self {
24        Self {
25            storage,
26            poll_interval: Duration::seconds(30), // Check every 30 seconds by default
27        }
28    }
29
30    /// Create a new job scheduler with custom poll interval
31    pub fn with_poll_interval(storage: Arc<dyn Storage>, poll_interval: Duration) -> Self {
32        Self {
33            storage,
34            poll_interval,
35        }
36    }
37
38    /// Start the scheduler loop
39    pub async fn run(&self) -> Result<()> {
40        info!(
41            "Starting job scheduler with poll interval: {:?}",
42            self.poll_interval
43        );
44
45        let mut interval =
46            interval(
47                self.poll_interval
48                    .to_std()
49                    .map_err(|e| QmlError::ConfigurationError {
50                        message: format!("Invalid poll interval: {}", e),
51                    })?,
52            );
53
54        loop {
55            interval.tick().await;
56
57            if let Err(e) = self.process_scheduled_jobs().await {
58                error!("Error processing scheduled jobs: {}", e);
59            }
60
61            if let Err(e) = self.process_retry_jobs().await {
62                error!("Error processing retry jobs: {}", e);
63            }
64        }
65    }
66
67    /// Process jobs that are scheduled for execution
68    async fn process_scheduled_jobs(&self) -> Result<()> {
69        debug!("Checking for scheduled jobs ready for execution");
70
71        let now = Utc::now();
72
73        // Get all scheduled jobs
74        let scheduled_state = JobState::scheduled(now, "check");
75        let jobs = self
76            .storage
77            .list(Some(&scheduled_state), None, None)
78            .await
79            .map_err(|e| QmlError::StorageError {
80                message: format!("Failed to list scheduled jobs: {}", e),
81            })?;
82
83        let mut ready_jobs = Vec::new();
84
85        for job in jobs {
86            if let JobState::Scheduled { enqueue_at, .. } = &job.state {
87                if *enqueue_at <= now {
88                    ready_jobs.push(job);
89                }
90            }
91        }
92
93        debug!(
94            "Found {} scheduled jobs ready for execution",
95            ready_jobs.len()
96        );
97
98        // Enqueue ready jobs
99        for mut job in ready_jobs {
100            info!("Enqueueing scheduled job: {}", job.id);
101
102            let enqueued_state = JobState::enqueued(&job.queue);
103            if let Err(e) = job.set_state(enqueued_state) {
104                error!("Failed to set job state to Enqueued: {}", e);
105                continue;
106            }
107
108            if let Err(e) = self.storage.update(&job).await {
109                error!("Failed to update job in storage: {}", e);
110            }
111        }
112
113        Ok(())
114    }
115
116    /// Process jobs that are awaiting retry
117    async fn process_retry_jobs(&self) -> Result<()> {
118        debug!("Checking for jobs ready for retry");
119
120        let now = Utc::now();
121
122        // Get all jobs awaiting retry
123        let retry_state = JobState::awaiting_retry(now, 1, "check");
124        let jobs = self
125            .storage
126            .list(Some(&retry_state), None, None)
127            .await
128            .map_err(|e| QmlError::StorageError {
129                message: format!("Failed to list retry jobs: {}", e),
130            })?;
131
132        let mut ready_jobs = Vec::new();
133
134        for job in jobs {
135            if let JobState::AwaitingRetry { retry_at, .. } = &job.state {
136                if *retry_at <= now {
137                    ready_jobs.push(job);
138                }
139            }
140        }
141
142        debug!("Found {} retry jobs ready for execution", ready_jobs.len());
143
144        // Enqueue ready retry jobs
145        for mut job in ready_jobs {
146            info!("Enqueueing retry job: {}", job.id);
147
148            let enqueued_state = JobState::enqueued(&job.queue);
149            if let Err(e) = job.set_state(enqueued_state) {
150                error!("Failed to set job state to Enqueued: {}", e);
151                continue;
152            }
153
154            if let Err(e) = self.storage.update(&job).await {
155                error!("Failed to update job in storage: {}", e);
156            }
157        }
158
159        Ok(())
160    }
161
162    /// Schedule a job for future execution
163    pub async fn schedule_job(
164        &self,
165        mut job: Job,
166        execute_at: DateTime<Utc>,
167        reason: impl Into<String>,
168    ) -> Result<()> {
169        let scheduled_state = JobState::scheduled(execute_at, reason);
170
171        job.set_state(scheduled_state)?;
172
173        self.storage
174            .enqueue(&job)
175            .await
176            .map_err(|e| QmlError::StorageError {
177                message: format!("Failed to schedule job: {}", e),
178            })?;
179
180        info!("Scheduled job {} for execution at {}", job.id, execute_at);
181        Ok(())
182    }
183
184    /// Schedule a job with a delay from now
185    pub async fn schedule_job_in(
186        &self,
187        job: Job,
188        delay: Duration,
189        reason: impl Into<String>,
190    ) -> Result<()> {
191        let execute_at = Utc::now() + delay;
192        self.schedule_job(job, execute_at, reason).await
193    }
194
195    /// Get the current poll interval
196    pub fn poll_interval(&self) -> Duration {
197        self.poll_interval
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use crate::storage::MemoryStorage;
205
206    #[tokio::test]
207    async fn test_schedule_job() {
208        let storage = Arc::new(MemoryStorage::new());
209        let scheduler = JobScheduler::new(storage.clone());
210
211        let job = Job::new("test_method", vec!["arg1".to_string()]);
212        let job_id = job.id.clone();
213        let execute_at = Utc::now() + Duration::seconds(1);
214
215        scheduler
216            .schedule_job(job, execute_at, "test")
217            .await
218            .unwrap();
219
220        // Check that the job is scheduled
221        let stored_job = storage.get(&job_id).await.unwrap().unwrap();
222        assert!(matches!(stored_job.state, JobState::Scheduled { .. }));
223    }
224
225    #[tokio::test]
226    async fn test_process_scheduled_jobs() {
227        let storage = Arc::new(MemoryStorage::new());
228        let scheduler = JobScheduler::new(storage.clone());
229
230        // Create a job scheduled for immediate execution
231        let job = Job::new("test_method", vec!["arg1".to_string()]);
232        let job_id = job.id.clone();
233        let execute_at = Utc::now() - Duration::seconds(1); // Past time
234
235        scheduler
236            .schedule_job(job, execute_at, "test")
237            .await
238            .unwrap();
239
240        // Process scheduled jobs
241        scheduler.process_scheduled_jobs().await.unwrap();
242
243        // Check that the job is now enqueued
244        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
245        assert!(matches!(updated_job.state, JobState::Enqueued { .. }));
246    }
247
248    #[tokio::test]
249    async fn test_schedule_job_in() {
250        let storage = Arc::new(MemoryStorage::new());
251        let scheduler = JobScheduler::new(storage.clone());
252
253        let job = Job::new("test_method", vec!["arg1".to_string()]);
254        let job_id = job.id.clone();
255
256        scheduler
257            .schedule_job_in(job, Duration::minutes(5), "delayed")
258            .await
259            .unwrap();
260
261        // Check that the job is scheduled
262        let stored_job = storage.get(&job_id).await.unwrap().unwrap();
263        if let JobState::Scheduled { enqueue_at, .. } = stored_job.state {
264            assert!(enqueue_at > Utc::now());
265        } else {
266            panic!("Job should be in Scheduled state");
267        }
268    }
269}