aurora_db/workers/
queue.rs1use super::job::{Job, JobStatus};
2use crate::error::{AuroraError, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6
7pub struct JobQueue {
9 jobs: Arc<DashMap<String, Job>>,
11 storage: Arc<ColdStore>,
13 collection: String,
15}
16
17impl JobQueue {
18 pub fn new(storage_path: String) -> Result<Self> {
19 let storage = Arc::new(ColdStore::new(&storage_path)?);
20 let jobs = Arc::new(DashMap::new());
21
22 let queue = Self {
23 jobs,
24 storage,
25 collection: "__aurora_jobs".to_string(),
26 };
27
28 queue.load_jobs()?;
30
31 Ok(queue)
32 }
33
34 fn load_jobs(&self) -> Result<()> {
36 let prefix = format!("{}:", self.collection);
38
39 for entry in self.storage.scan_prefix(&prefix) {
40 if let Ok((key, value)) = entry {
41 if let Ok(job) = bincode::deserialize::<Job>(&value) {
42 if !matches!(job.status, JobStatus::Completed) {
44 let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
45 self.jobs.insert(job_id, job);
46 }
47 }
48 }
49 }
50
51 Ok(())
52 }
53
54 pub async fn enqueue(&self, job: Job) -> Result<String> {
56 let job_id = job.id.clone();
57 let key = format!("{}:{}", self.collection, job_id);
58
59 let serialized =
61 bincode::serialize(&job).map_err(|e| AuroraError::SerializationError(e.to_string()))?;
62 self.storage.set(key, serialized)?;
63
64 self.jobs.insert(job_id.clone(), job);
66
67 Ok(job_id)
68 }
69
70 pub async fn dequeue(&self) -> Result<Option<Job>> {
72 let mut best_job: Option<(String, Job)> = None;
73
74 for entry in self.jobs.iter() {
75 let job = entry.value();
76
77 if !job.should_run() {
79 continue;
80 }
81
82 if !matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. }) {
84 continue;
85 }
86
87 match &best_job {
89 None => {
90 best_job = Some((entry.key().clone(), job.clone()));
91 }
92 Some((_, current_best)) => {
93 if job.priority > current_best.priority {
95 best_job = Some((entry.key().clone(), job.clone()));
96 }
97 else if job.priority == current_best.priority
99 && job.created_at < current_best.created_at
100 {
101 best_job = Some((entry.key().clone(), job.clone()));
102 }
103 }
104 }
105 }
106
107 if let Some((job_id, mut job)) = best_job {
108 job.mark_running();
110 self.update_job(&job_id, job.clone()).await?;
111 Ok(Some(job))
112 } else {
113 Ok(None)
114 }
115 }
116
117 pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
119 let key = format!("{}:{}", self.collection, job_id);
120
121 let serialized =
123 bincode::serialize(&job).map_err(|e| AuroraError::SerializationError(e.to_string()))?;
124 self.storage.set(key, serialized)?;
125
126 self.jobs.insert(job_id.to_string(), job);
128
129 Ok(())
130 }
131
132 pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
134 Ok(self.jobs.get(job_id).map(|j| j.clone()))
135 }
136
137 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
139 Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
140 }
141
142 pub async fn cleanup_completed(&self) -> Result<usize> {
144 let mut removed = 0;
145
146 let to_remove: Vec<String> = self
147 .jobs
148 .iter()
149 .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
150 .map(|entry| entry.key().clone())
151 .collect();
152
153 for job_id in to_remove {
154 let key = format!("{}:{}", self.collection, job_id);
155 self.storage.delete(&key)?;
156 self.jobs.remove(&job_id);
157 removed += 1;
158 }
159
160 Ok(removed)
161 }
162
163 pub async fn stats(&self) -> Result<super::QueueStats> {
165 let mut stats = super::QueueStats {
166 pending: 0,
167 running: 0,
168 completed: 0,
169 failed: 0,
170 dead_letter: 0,
171 };
172
173 for entry in self.jobs.iter() {
174 match &entry.value().status {
175 JobStatus::Pending => stats.pending += 1,
176 JobStatus::Running => stats.running += 1,
177 JobStatus::Completed => stats.completed += 1,
178 JobStatus::Failed { .. } => stats.failed += 1,
179 JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
180 }
181 }
182
183 Ok(stats)
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use crate::workers::job::{Job, JobPriority};
191 use tempfile::TempDir;
192
193 #[tokio::test]
194 async fn test_queue_enqueue_dequeue() {
195 let temp_dir = TempDir::new().unwrap();
196 let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
197
198 let job = Job::new("test");
199 let job_id = queue.enqueue(job).await.unwrap();
200
201 let dequeued = queue.dequeue().await.unwrap();
202 assert!(dequeued.is_some());
203 assert_eq!(dequeued.unwrap().id, job_id);
204 }
205
206 #[tokio::test]
207 async fn test_queue_priority_ordering() {
208 let temp_dir = TempDir::new().unwrap();
209 let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
210
211 queue
212 .enqueue(Job::new("low").with_priority(JobPriority::Low))
213 .await
214 .unwrap();
215 queue
216 .enqueue(Job::new("high").with_priority(JobPriority::High))
217 .await
218 .unwrap();
219 queue
220 .enqueue(Job::new("critical").with_priority(JobPriority::Critical))
221 .await
222 .unwrap();
223
224 let first = queue.dequeue().await.unwrap().unwrap();
225 assert_eq!(first.job_type, "critical");
226
227 let second = queue.dequeue().await.unwrap().unwrap();
228 assert_eq!(second.job_type, "high");
229
230 let third = queue.dequeue().await.unwrap().unwrap();
231 assert_eq!(third.job_type, "low");
232 }
233
234 #[tokio::test]
235 async fn test_queue_persistence() {
236 let temp_dir = TempDir::new().unwrap();
237 let path = temp_dir.path().to_str().unwrap().to_string();
238
239 {
241 let queue = JobQueue::new(path.clone()).unwrap();
242 queue.enqueue(Job::new("persistent")).await.unwrap();
243 }
244
245 {
247 let queue = JobQueue::new(path).unwrap();
248 let stats = queue.stats().await.unwrap();
249 assert_eq!(stats.pending, 1);
250 }
251 }
252}