Skip to main content

aurora_db/workers/
queue.rs

1use super::job::{Job, JobStatus};
2use crate::error::{AqlError, ErrorCode, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6
7use tokio::sync::Notify;
8
9/// Persistent job queue
10pub struct JobQueue {
11    // In-memory index for fast lookups
12    jobs: Arc<DashMap<String, Job>>,
13    // Persistent storage
14    storage: Arc<ColdStore>,
15    // Collection name for jobs
16    collection: String,
17    // Notification for worker wake-up
18    notify: Arc<Notify>,
19}
20
21impl JobQueue {
22    pub fn new(storage_path: String) -> Result<Self> {
23        let storage = Arc::new(ColdStore::new(&storage_path)?);
24        let jobs = Arc::new(DashMap::new());
25        let notify = Arc::new(Notify::new());
26
27        let queue = Self {
28            jobs,
29            storage,
30            collection: "__aurora_jobs".to_string(),
31            notify,
32        };
33
34        // Load existing jobs from storage
35        queue.load_jobs()?;
36
37        Ok(queue)
38    }
39
40    /// Load jobs from persistent storage into memory
41    fn load_jobs(&self) -> Result<()> {
42        // Scan all jobs from storage
43        let prefix = format!("{}:", self.collection);
44
45        for entry in self.storage.scan_prefix(&prefix) {
46            if let Ok((key, value)) = entry
47                && let Ok(job) = bincode::deserialize::<Job>(&value)
48            {
49                // Only load jobs that aren't completed
50                if !matches!(job.status, JobStatus::Completed) {
51                    let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
52                    self.jobs.insert(job_id, job);
53                }
54            }
55        }
56
57        Ok(())
58    }
59
60    /// Enqueue a new job
61    pub async fn enqueue(&self, job: Job) -> Result<String> {
62        let job_id = job.id.clone();
63        let key = format!("{}:{}", self.collection, job_id);
64
65        // Persist to storage
66        let serialized = bincode::serialize(&job)
67            .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
68        self.storage.set(key, serialized)?;
69
70        // Add to in-memory index
71        self.jobs.insert(job_id.clone(), job);
72
73        // Wake up waiting workers
74        self.notify.notify_waiters();
75
76        Ok(job_id)
77    }
78
79    /// Wait for a notification
80    pub async fn notified(&self) {
81        self.notify.notified().await;
82    }
83
84    /// Notify all waiting workers (used for shutdown)
85    pub fn notify_all(&self) {
86        self.notify.notify_waiters();
87    }
88
89    /// Dequeue next job (highest priority, oldest first)
90    pub async fn dequeue(&self) -> Result<Option<Job>> {
91        let mut best_job: Option<(String, Job)> = None;
92
93        for entry in self.jobs.iter() {
94            let job = entry.value();
95
96            // Skip jobs that shouldn't run yet
97            if !job.should_run() {
98                continue;
99            }
100
101            // Skip non-pending jobs
102            if !matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. }) {
103                continue;
104            }
105
106            // Find highest priority job
107            match &best_job {
108                None => {
109                    best_job = Some((entry.key().clone(), job.clone()));
110                }
111                Some((_, current_best)) => {
112                    // Higher priority wins, or same priority but older job wins
113                    if job.priority > current_best.priority
114                        || (job.priority == current_best.priority
115                            && job.created_at < current_best.created_at)
116                    {
117                        best_job = Some((entry.key().clone(), job.clone()));
118                    }
119                }
120            }
121        }
122
123        if let Some((job_id, mut job)) = best_job {
124            // Mark as running
125            job.mark_running();
126            self.update_job(&job_id, job.clone()).await?;
127            Ok(Some(job))
128        } else {
129            Ok(None)
130        }
131    }
132
133    /// Update job status
134    pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
135        let key = format!("{}:{}", self.collection, job_id);
136
137        // Persist to storage
138        let serialized = bincode::serialize(&job)
139            .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
140        self.storage.set(key, serialized)?;
141
142        // Update in-memory index
143        self.jobs.insert(job_id.to_string(), job);
144
145        Ok(())
146    }
147
148    /// Get job by ID
149    pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
150        Ok(self.jobs.get(job_id).map(|j| j.clone()))
151    }
152
153    /// Get job status
154    pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
155        Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
156    }
157
158    /// Remove completed jobs (cleanup)
159    pub async fn cleanup_completed(&self) -> Result<usize> {
160        let mut removed = 0;
161
162        let to_remove: Vec<String> = self
163            .jobs
164            .iter()
165            .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
166            .map(|entry| entry.key().clone())
167            .collect();
168
169        for job_id in to_remove {
170            let key = format!("{}:{}", self.collection, job_id);
171            self.storage.delete(&key)?;
172            self.jobs.remove(&job_id);
173            removed += 1;
174        }
175
176        Ok(removed)
177    }
178
179    /// Get queue statistics
180    pub async fn stats(&self) -> Result<super::QueueStats> {
181        let mut stats = super::QueueStats {
182            pending: 0,
183            running: 0,
184            completed: 0,
185            failed: 0,
186            dead_letter: 0,
187        };
188
189        for entry in self.jobs.iter() {
190            match &entry.value().status {
191                JobStatus::Pending => stats.pending += 1,
192                JobStatus::Running => stats.running += 1,
193                JobStatus::Completed => stats.completed += 1,
194                JobStatus::Failed { .. } => stats.failed += 1,
195                JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
196            }
197        }
198
199        Ok(stats)
200    }
201
202    /// Find zombie jobs (Running with expired heartbeat)
203    ///
204    /// Efficiently scans running jobs using the in-memory DashMap index.
205    /// Returns job IDs of zombies that need to be reset to Pending.
206    pub async fn find_zombie_jobs(&self) -> Vec<String> {
207        self.jobs
208            .iter()
209            .filter(|entry| entry.value().is_heartbeat_expired())
210            .map(|entry| entry.key().clone())
211            .collect()
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::workers::job::{Job, JobPriority};
219    use tempfile::TempDir;
220
221    #[tokio::test]
222    async fn test_queue_enqueue_dequeue() {
223        let temp_dir = TempDir::new().unwrap();
224        let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
225
226        let job = Job::new("test");
227        let job_id = queue.enqueue(job).await.unwrap();
228
229        let dequeued = queue.dequeue().await.unwrap();
230        assert!(dequeued.is_some());
231        assert_eq!(dequeued.unwrap().id, job_id);
232    }
233
234    #[tokio::test]
235    async fn test_queue_priority_ordering() {
236        let temp_dir = TempDir::new().unwrap();
237        let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
238
239        queue
240            .enqueue(Job::new("low").with_priority(JobPriority::Low))
241            .await
242            .unwrap();
243        queue
244            .enqueue(Job::new("high").with_priority(JobPriority::High))
245            .await
246            .unwrap();
247        queue
248            .enqueue(Job::new("critical").with_priority(JobPriority::Critical))
249            .await
250            .unwrap();
251
252        let first = queue.dequeue().await.unwrap().unwrap();
253        assert_eq!(first.job_type, "critical");
254
255        let second = queue.dequeue().await.unwrap().unwrap();
256        assert_eq!(second.job_type, "high");
257
258        let third = queue.dequeue().await.unwrap().unwrap();
259        assert_eq!(third.job_type, "low");
260    }
261
262    #[tokio::test]
263    async fn test_queue_persistence() {
264        let temp_dir = TempDir::new().unwrap();
265        let path = temp_dir.path().to_str().unwrap().to_string();
266
267        // Create queue and add job
268        {
269            let queue = JobQueue::new(path.clone()).unwrap();
270            queue.enqueue(Job::new("persistent")).await.unwrap();
271        }
272
273        // Reopen queue and verify job is still there
274        {
275            let queue = JobQueue::new(path).unwrap();
276            let stats = queue.stats().await.unwrap();
277            assert_eq!(stats.pending, 1);
278        }
279    }
280}