aurora_db/workers/
queue.rs1use super::job::{Job, JobStatus};
2use crate::error::{AuroraError, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6
7pub struct JobQueue {
9 jobs: Arc<DashMap<String, Job>>,
11 storage: Arc<ColdStore>,
13 collection: String,
15}
16
17impl JobQueue {
18 pub fn new(storage_path: String) -> Result<Self> {
19 let storage = Arc::new(ColdStore::new(&storage_path)?);
20 let jobs = Arc::new(DashMap::new());
21
22 let queue = Self {
23 jobs,
24 storage,
25 collection: "__aurora_jobs".to_string(),
26 };
27
28 queue.load_jobs()?;
30
31 Ok(queue)
32 }
33
34 fn load_jobs(&self) -> Result<()> {
36 let prefix = format!("{}:", self.collection);
38
39 for entry in self.storage.scan_prefix(&prefix) {
40 if let Ok((key, value)) = entry
41 && let Ok(job) = bincode::deserialize::<Job>(&value) {
42 if !matches!(job.status, JobStatus::Completed) {
44 let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
45 self.jobs.insert(job_id, job);
46 }
47 }
48 }
49
50 Ok(())
51 }
52
53 pub async fn enqueue(&self, job: Job) -> Result<String> {
55 let job_id = job.id.clone();
56 let key = format!("{}:{}", self.collection, job_id);
57
58 let serialized =
60 bincode::serialize(&job).map_err(|e| AuroraError::SerializationError(e.to_string()))?;
61 self.storage.set(key, serialized)?;
62
63 self.jobs.insert(job_id.clone(), job);
65
66 Ok(job_id)
67 }
68
69 pub async fn dequeue(&self) -> Result<Option<Job>> {
71 let mut best_job: Option<(String, Job)> = None;
72
73 for entry in self.jobs.iter() {
74 let job = entry.value();
75
76 if !job.should_run() {
78 continue;
79 }
80
81 if !matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. }) {
83 continue;
84 }
85
86 match &best_job {
88 None => {
89 best_job = Some((entry.key().clone(), job.clone()));
90 }
91 Some((_, current_best)) => {
92 if job.priority > current_best.priority
94 || (job.priority == current_best.priority
95 && job.created_at < current_best.created_at)
96 {
97 best_job = Some((entry.key().clone(), job.clone()));
98 }
99 }
100 }
101 }
102
103 if let Some((job_id, mut job)) = best_job {
104 job.mark_running();
106 self.update_job(&job_id, job.clone()).await?;
107 Ok(Some(job))
108 } else {
109 Ok(None)
110 }
111 }
112
113 pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
115 let key = format!("{}:{}", self.collection, job_id);
116
117 let serialized =
119 bincode::serialize(&job).map_err(|e| AuroraError::SerializationError(e.to_string()))?;
120 self.storage.set(key, serialized)?;
121
122 self.jobs.insert(job_id.to_string(), job);
124
125 Ok(())
126 }
127
128 pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
130 Ok(self.jobs.get(job_id).map(|j| j.clone()))
131 }
132
133 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
135 Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
136 }
137
138 pub async fn cleanup_completed(&self) -> Result<usize> {
140 let mut removed = 0;
141
142 let to_remove: Vec<String> = self
143 .jobs
144 .iter()
145 .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
146 .map(|entry| entry.key().clone())
147 .collect();
148
149 for job_id in to_remove {
150 let key = format!("{}:{}", self.collection, job_id);
151 self.storage.delete(&key)?;
152 self.jobs.remove(&job_id);
153 removed += 1;
154 }
155
156 Ok(removed)
157 }
158
159 pub async fn stats(&self) -> Result<super::QueueStats> {
161 let mut stats = super::QueueStats {
162 pending: 0,
163 running: 0,
164 completed: 0,
165 failed: 0,
166 dead_letter: 0,
167 };
168
169 for entry in self.jobs.iter() {
170 match &entry.value().status {
171 JobStatus::Pending => stats.pending += 1,
172 JobStatus::Running => stats.running += 1,
173 JobStatus::Completed => stats.completed += 1,
174 JobStatus::Failed { .. } => stats.failed += 1,
175 JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
176 }
177 }
178
179 Ok(stats)
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186 use crate::workers::job::{Job, JobPriority};
187 use tempfile::TempDir;
188
189 #[tokio::test]
190 async fn test_queue_enqueue_dequeue() {
191 let temp_dir = TempDir::new().unwrap();
192 let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
193
194 let job = Job::new("test");
195 let job_id = queue.enqueue(job).await.unwrap();
196
197 let dequeued = queue.dequeue().await.unwrap();
198 assert!(dequeued.is_some());
199 assert_eq!(dequeued.unwrap().id, job_id);
200 }
201
202 #[tokio::test]
203 async fn test_queue_priority_ordering() {
204 let temp_dir = TempDir::new().unwrap();
205 let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
206
207 queue
208 .enqueue(Job::new("low").with_priority(JobPriority::Low))
209 .await
210 .unwrap();
211 queue
212 .enqueue(Job::new("high").with_priority(JobPriority::High))
213 .await
214 .unwrap();
215 queue
216 .enqueue(Job::new("critical").with_priority(JobPriority::Critical))
217 .await
218 .unwrap();
219
220 let first = queue.dequeue().await.unwrap().unwrap();
221 assert_eq!(first.job_type, "critical");
222
223 let second = queue.dequeue().await.unwrap().unwrap();
224 assert_eq!(second.job_type, "high");
225
226 let third = queue.dequeue().await.unwrap().unwrap();
227 assert_eq!(third.job_type, "low");
228 }
229
230 #[tokio::test]
231 async fn test_queue_persistence() {
232 let temp_dir = TempDir::new().unwrap();
233 let path = temp_dir.path().to_str().unwrap().to_string();
234
235 {
237 let queue = JobQueue::new(path.clone()).unwrap();
238 queue.enqueue(Job::new("persistent")).await.unwrap();
239 }
240
241 {
243 let queue = JobQueue::new(path).unwrap();
244 let stats = queue.stats().await.unwrap();
245 assert_eq!(stats.pending, 1);
246 }
247 }
248}