aurora_db/workers/
queue.rs1use super::job::{Job, JobStatus};
2use crate::error::{AqlError, ErrorCode, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6
7use tokio::sync::Notify;
8
9pub struct JobQueue {
11 jobs: Arc<DashMap<String, Job>>,
13 storage: Arc<ColdStore>,
15 collection: String,
17 notify: Arc<Notify>,
19}
20
21impl JobQueue {
22 pub fn new(storage_path: String) -> Result<Self> {
23 let storage = Arc::new(ColdStore::new(&storage_path)?);
24 let jobs = Arc::new(DashMap::new());
25 let notify = Arc::new(Notify::new());
26
27 let queue = Self {
28 jobs,
29 storage,
30 collection: "__aurora_jobs".to_string(),
31 notify,
32 };
33
34 queue.load_jobs()?;
36
37 Ok(queue)
38 }
39
40 fn load_jobs(&self) -> Result<()> {
42 let prefix = format!("{}:", self.collection);
44
45 for entry in self.storage.scan_prefix(&prefix) {
46 if let Ok((key, value)) = entry
47 && let Ok(job) = bincode::deserialize::<Job>(&value)
48 {
49 if !matches!(job.status, JobStatus::Completed) {
51 let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
52 self.jobs.insert(job_id, job);
53 }
54 }
55 }
56
57 Ok(())
58 }
59
60 pub async fn enqueue(&self, job: Job) -> Result<String> {
62 let job_id = job.id.clone();
63 let key = format!("{}:{}", self.collection, job_id);
64
65 let serialized = bincode::serialize(&job)
67 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
68 self.storage.set(key, serialized)?;
69
70 self.jobs.insert(job_id.clone(), job);
72
73 self.notify.notify_waiters();
75
76 Ok(job_id)
77 }
78
79 pub async fn notified(&self) {
81 self.notify.notified().await;
82 }
83
84 pub fn notify_all(&self) {
86 self.notify.notify_waiters();
87 }
88
89 pub async fn dequeue(&self) -> Result<Option<Job>> {
91 let mut best_job: Option<(String, Job)> = None;
92
93 for entry in self.jobs.iter() {
94 let job = entry.value();
95
96 if !job.should_run() {
98 continue;
99 }
100
101 if !matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. }) {
103 continue;
104 }
105
106 match &best_job {
108 None => {
109 best_job = Some((entry.key().clone(), job.clone()));
110 }
111 Some((_, current_best)) => {
112 if job.priority > current_best.priority
114 || (job.priority == current_best.priority
115 && job.created_at < current_best.created_at)
116 {
117 best_job = Some((entry.key().clone(), job.clone()));
118 }
119 }
120 }
121 }
122
123 if let Some((job_id, mut job)) = best_job {
124 job.mark_running();
126 self.update_job(&job_id, job.clone()).await?;
127 Ok(Some(job))
128 } else {
129 Ok(None)
130 }
131 }
132
133 pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
135 let key = format!("{}:{}", self.collection, job_id);
136
137 let serialized = bincode::serialize(&job)
139 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
140 self.storage.set(key, serialized)?;
141
142 self.jobs.insert(job_id.to_string(), job);
144
145 Ok(())
146 }
147
148 pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
150 Ok(self.jobs.get(job_id).map(|j| j.clone()))
151 }
152
153 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
155 Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
156 }
157
158 pub async fn cleanup_completed(&self) -> Result<usize> {
160 let mut removed = 0;
161
162 let to_remove: Vec<String> = self
163 .jobs
164 .iter()
165 .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
166 .map(|entry| entry.key().clone())
167 .collect();
168
169 for job_id in to_remove {
170 let key = format!("{}:{}", self.collection, job_id);
171 self.storage.delete(&key)?;
172 self.jobs.remove(&job_id);
173 removed += 1;
174 }
175
176 Ok(removed)
177 }
178
179 pub async fn stats(&self) -> Result<super::QueueStats> {
181 let mut stats = super::QueueStats {
182 pending: 0,
183 running: 0,
184 completed: 0,
185 failed: 0,
186 dead_letter: 0,
187 };
188
189 for entry in self.jobs.iter() {
190 match &entry.value().status {
191 JobStatus::Pending => stats.pending += 1,
192 JobStatus::Running => stats.running += 1,
193 JobStatus::Completed => stats.completed += 1,
194 JobStatus::Failed { .. } => stats.failed += 1,
195 JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
196 }
197 }
198
199 Ok(stats)
200 }
201
202 pub async fn find_zombie_jobs(&self) -> Vec<String> {
207 self.jobs
208 .iter()
209 .filter(|entry| entry.value().is_heartbeat_expired())
210 .map(|entry| entry.key().clone())
211 .collect()
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::workers::job::{Job, JobPriority};
219 use tempfile::TempDir;
220
221 #[tokio::test]
222 async fn test_queue_enqueue_dequeue() {
223 let temp_dir = TempDir::new().unwrap();
224 let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
225
226 let job = Job::new("test");
227 let job_id = queue.enqueue(job).await.unwrap();
228
229 let dequeued = queue.dequeue().await.unwrap();
230 assert!(dequeued.is_some());
231 assert_eq!(dequeued.unwrap().id, job_id);
232 }
233
234 #[tokio::test]
235 async fn test_queue_priority_ordering() {
236 let temp_dir = TempDir::new().unwrap();
237 let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
238
239 queue
240 .enqueue(Job::new("low").with_priority(JobPriority::Low))
241 .await
242 .unwrap();
243 queue
244 .enqueue(Job::new("high").with_priority(JobPriority::High))
245 .await
246 .unwrap();
247 queue
248 .enqueue(Job::new("critical").with_priority(JobPriority::Critical))
249 .await
250 .unwrap();
251
252 let first = queue.dequeue().await.unwrap().unwrap();
253 assert_eq!(first.job_type, "critical");
254
255 let second = queue.dequeue().await.unwrap().unwrap();
256 assert_eq!(second.job_type, "high");
257
258 let third = queue.dequeue().await.unwrap().unwrap();
259 assert_eq!(third.job_type, "low");
260 }
261
262 #[tokio::test]
263 async fn test_queue_persistence() {
264 let temp_dir = TempDir::new().unwrap();
265 let path = temp_dir.path().to_str().unwrap().to_string();
266
267 {
269 let queue = JobQueue::new(path.clone()).unwrap();
270 queue.enqueue(Job::new("persistent")).await.unwrap();
271 }
272
273 {
275 let queue = JobQueue::new(path).unwrap();
276 let stats = queue.stats().await.unwrap();
277 assert_eq!(stats.pending, 1);
278 }
279 }
280}