aurora_db/workers/
queue.rs1use super::job::{Job, JobStatus};
2use crate::error::{AqlError, ErrorCode, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6use tokio::sync::mpsc;
7
8pub struct JobQueue {
10 jobs: Arc<DashMap<String, Job>>,
12 pending_tx: mpsc::UnboundedSender<String>,
14 pending_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<String>>>,
15 storage: Arc<ColdStore>,
17 collection: String,
19}
20
21impl JobQueue {
22 pub fn new(storage_path: String) -> Result<Self> {
23 let storage = Arc::new(ColdStore::new(&storage_path)?);
24 let jobs = Arc::new(DashMap::new());
25 let (pending_tx, pending_rx) = mpsc::unbounded_channel();
26
27 let queue = Self {
28 jobs,
29 pending_tx,
30 pending_rx: Arc::new(tokio::sync::Mutex::new(pending_rx)),
31 storage,
32 collection: "__aurora_jobs".to_string(),
33 };
34
35 queue.load_jobs()?;
37
38 Ok(queue)
39 }
40
41 fn load_jobs(&self) -> Result<()> {
43 let prefix = format!("{}:", self.collection);
44
45 for entry in self.storage.scan_prefix(&prefix) {
46 if let Ok((key, value)) = entry
47 && let Ok(job) = bincode::deserialize::<Job>(&value)
48 {
49 if !matches!(job.status, JobStatus::Completed) {
50 let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
51 self.jobs.insert(job_id.clone(), job);
52
53 let _ = self.pending_tx.send(job_id);
55 }
56 }
57 }
58
59 Ok(())
60 }
61
62 pub async fn enqueue(&self, job: Job) -> Result<String> {
64 let job_id = job.id.clone();
65 let key = format!("{}:{}", self.collection, job_id);
66
67 self.jobs.insert(job_id.clone(), job.clone());
69
70 let serialized = bincode::serialize(&job)
73 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
74 self.storage.set(key, serialized)?;
75
76 let _ = self.pending_tx.send(job_id.clone());
78
79 Ok(job_id)
80 }
81
82 pub async fn dequeue(&self) -> Result<Option<Job>> {
84 let mut rx = self.pending_rx.lock().await;
85
86 while let Some(job_id) = rx.recv().await {
87 if job_id == "__SHUTDOWN__" {
89 return Ok(None);
90 }
91
92 if let Some(mut job) = self.jobs.get_mut(&job_id) {
94 if matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. }) && job.should_run() {
95 job.mark_running();
97 let job_clone = job.clone();
98 drop(job);
99
100 self.update_job(&job_id, job_clone.clone()).await?;
102
103 return Ok(Some(job_clone));
104 }
105 }
106 }
107
108 Ok(None)
109 }
110
111 pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
113 let key = format!("{}:{}", self.collection, job_id);
114
115 self.jobs.insert(job_id.to_string(), job.clone());
117
118 let serialized = bincode::serialize(&job)
120 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
121 self.storage.set(key, serialized)?;
122
123 Ok(())
124 }
125
126 pub async fn shutdown(&self) {
128 let _ = self.pending_tx.send("__SHUTDOWN__".to_string());
131 }
132
133 pub fn notify_all(&self) {}
135 pub async fn notified(&self) {
136 }
138
139 pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
141 Ok(self.jobs.get(job_id).map(|j| j.clone()))
142 }
143
144 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
146 Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
147 }
148
149 pub async fn cleanup_completed(&self) -> Result<usize> {
151 let mut removed = 0;
152 let to_remove: Vec<String> = self.jobs.iter()
153 .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
154 .map(|entry| entry.key().clone())
155 .collect();
156
157 for job_id in to_remove {
158 let key = format!("{}:{}", self.collection, job_id);
159 let _ = self.storage.delete(&key);
160 self.jobs.remove(&job_id);
161 removed += 1;
162 }
163 Ok(removed)
164 }
165
166 pub async fn stats(&self) -> Result<super::QueueStats> {
168 let mut stats = super::QueueStats {
169 pending: 0, running: 0, completed: 0, failed: 0, dead_letter: 0,
170 };
171 for entry in self.jobs.iter() {
172 match &entry.value().status {
173 JobStatus::Pending => stats.pending += 1,
174 JobStatus::Running => stats.running += 1,
175 JobStatus::Completed => stats.completed += 1,
176 JobStatus::Failed { .. } => stats.failed += 1,
177 JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
178 }
179 }
180 Ok(stats)
181 }
182
183 pub async fn find_zombie_jobs(&self) -> Vec<String> {
185 self.jobs.iter()
186 .filter(|entry| entry.value().is_heartbeat_expired())
187 .map(|entry| entry.key().clone())
188 .collect()
189 }
190}