aurora_db/workers/
queue.rs1use super::job::{Job, JobStatus};
2use crate::error::{AqlError, ErrorCode, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6use tokio::sync::mpsc;
7
8pub struct JobQueue {
10 jobs: Arc<DashMap<String, Job>>,
12 pending_tx: mpsc::UnboundedSender<String>,
14 pending_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<String>>>,
15 storage: Arc<ColdStore>,
17 collection: String,
19}
20
21impl JobQueue {
22 pub fn new(storage_path: String) -> Result<Self> {
23 let storage = Arc::new(ColdStore::new(&storage_path)?);
24 let jobs = Arc::new(DashMap::new());
25 let (pending_tx, pending_rx) = mpsc::unbounded_channel();
26
27 let queue = Self {
28 jobs,
29 pending_tx,
30 pending_rx: Arc::new(tokio::sync::Mutex::new(pending_rx)),
31 storage,
32 collection: "__aurora_jobs".to_string(),
33 };
34
35 queue.load_jobs()?;
37
38 Ok(queue)
39 }
40
41 fn load_jobs(&self) -> Result<()> {
43 let prefix = format!("{}:", self.collection);
44
45 for entry in self.storage.scan_prefix(&prefix) {
46 if let Ok((key, value)) = entry
47 && let Ok(job) = bincode::deserialize::<Job>(&value)
48 {
49 if !matches!(job.status, JobStatus::Completed) {
50 let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
51 self.jobs.insert(job_id.clone(), job);
52
53 let _ = self.pending_tx.send(job_id);
55 }
56 }
57 }
58
59 Ok(())
60 }
61
62 pub async fn enqueue(&self, job: Job) -> Result<String> {
64 let job_id = job.id.clone();
65 let key = format!("{}:{}", self.collection, job_id);
66
67 self.jobs.insert(job_id.clone(), job.clone());
69
70 let serialized = bincode::serialize(&job)
73 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
74 self.storage.set(key, serialized)?;
75
76 let _ = self.pending_tx.send(job_id.clone());
78
79 Ok(job_id)
80 }
81
82 pub async fn dequeue(&self) -> Result<Option<Job>> {
84 let mut rx = self.pending_rx.lock().await;
85
86 while let Some(job_id) = rx.recv().await {
87 if job_id == "__SHUTDOWN__" {
89 return Ok(None);
90 }
91
92 if let Some(mut job) = self.jobs.get_mut(&job_id) {
94 if matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. })
95 && job.should_run()
96 {
97 job.mark_running();
99 let job_clone = job.clone();
100 drop(job);
101
102 self.update_job(&job_id, job_clone.clone()).await?;
104
105 return Ok(Some(job_clone));
106 }
107 }
108 }
109
110 Ok(None)
111 }
112
113 pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
115 let key = format!("{}:{}", self.collection, job_id);
116
117 self.jobs.insert(job_id.to_string(), job.clone());
119
120 let serialized = bincode::serialize(&job)
122 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
123 self.storage.set(key, serialized)?;
124
125 Ok(())
126 }
127
128 pub async fn shutdown(&self) {
130 let _ = self.pending_tx.send("__SHUTDOWN__".to_string());
133 }
134
135 pub fn notify_all(&self) {}
137 pub async fn notified(&self) {
138 }
140
141 pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
143 Ok(self.jobs.get(job_id).map(|j| j.clone()))
144 }
145
146 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
148 Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
149 }
150
151 pub async fn cleanup_completed(&self) -> Result<usize> {
153 let mut removed = 0;
154 let to_remove: Vec<String> = self
155 .jobs
156 .iter()
157 .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
158 .map(|entry| entry.key().clone())
159 .collect();
160
161 for job_id in to_remove {
162 let key = format!("{}:{}", self.collection, job_id);
163 let _ = self.storage.delete(&key);
164 self.jobs.remove(&job_id);
165 removed += 1;
166 }
167 Ok(removed)
168 }
169
170 pub async fn stats(&self) -> Result<super::QueueStats> {
172 let mut stats = super::QueueStats {
173 pending: 0,
174 running: 0,
175 completed: 0,
176 failed: 0,
177 dead_letter: 0,
178 };
179 for entry in self.jobs.iter() {
180 match &entry.value().status {
181 JobStatus::Pending => stats.pending += 1,
182 JobStatus::Running => stats.running += 1,
183 JobStatus::Completed => stats.completed += 1,
184 JobStatus::Failed { .. } => stats.failed += 1,
185 JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
186 }
187 }
188 Ok(stats)
189 }
190
191 pub async fn find_zombie_jobs(&self) -> Vec<String> {
193 self.jobs
194 .iter()
195 .filter(|entry| entry.value().is_heartbeat_expired())
196 .map(|entry| entry.key().clone())
197 .collect()
198 }
199}