aurora_db/workers/
queue.rs

1use super::job::{Job, JobStatus};
2use crate::error::{AuroraError, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6
7/// Persistent job queue
8pub struct JobQueue {
9    // In-memory index for fast lookups
10    jobs: Arc<DashMap<String, Job>>,
11    // Persistent storage
12    storage: Arc<ColdStore>,
13    // Collection name for jobs
14    collection: String,
15}
16
17impl JobQueue {
18    pub fn new(storage_path: String) -> Result<Self> {
19        let storage = Arc::new(ColdStore::new(&storage_path)?);
20        let jobs = Arc::new(DashMap::new());
21
22        let queue = Self {
23            jobs,
24            storage,
25            collection: "__aurora_jobs".to_string(),
26        };
27
28        // Load existing jobs from storage
29        queue.load_jobs()?;
30
31        Ok(queue)
32    }
33
34    /// Load jobs from persistent storage into memory
35    fn load_jobs(&self) -> Result<()> {
36        // Scan all jobs from storage
37        let prefix = format!("{}:", self.collection);
38
39        for entry in self.storage.scan_prefix(&prefix) {
40            if let Ok((key, value)) = entry
41                && let Ok(job) = bincode::deserialize::<Job>(&value) {
42                    // Only load jobs that aren't completed
43                    if !matches!(job.status, JobStatus::Completed) {
44                        let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
45                        self.jobs.insert(job_id, job);
46                    }
47                }
48        }
49
50        Ok(())
51    }
52
53    /// Enqueue a new job
54    pub async fn enqueue(&self, job: Job) -> Result<String> {
55        let job_id = job.id.clone();
56        let key = format!("{}:{}", self.collection, job_id);
57
58        // Persist to storage
59        let serialized =
60            bincode::serialize(&job).map_err(|e| AuroraError::SerializationError(e.to_string()))?;
61        self.storage.set(key, serialized)?;
62
63        // Add to in-memory index
64        self.jobs.insert(job_id.clone(), job);
65
66        Ok(job_id)
67    }
68
69    /// Dequeue next job (highest priority, oldest first)
70    pub async fn dequeue(&self) -> Result<Option<Job>> {
71        let mut best_job: Option<(String, Job)> = None;
72
73        for entry in self.jobs.iter() {
74            let job = entry.value();
75
76            // Skip jobs that shouldn't run yet
77            if !job.should_run() {
78                continue;
79            }
80
81            // Skip non-pending jobs
82            if !matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. }) {
83                continue;
84            }
85
86            // Find highest priority job
87            match &best_job {
88                None => {
89                    best_job = Some((entry.key().clone(), job.clone()));
90                }
91                Some((_, current_best)) => {
92                    // Higher priority wins, or same priority but older job wins
93                    if job.priority > current_best.priority
94                        || (job.priority == current_best.priority
95                            && job.created_at < current_best.created_at)
96                    {
97                        best_job = Some((entry.key().clone(), job.clone()));
98                    }
99                }
100            }
101        }
102
103        if let Some((job_id, mut job)) = best_job {
104            // Mark as running
105            job.mark_running();
106            self.update_job(&job_id, job.clone()).await?;
107            Ok(Some(job))
108        } else {
109            Ok(None)
110        }
111    }
112
113    /// Update job status
114    pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
115        let key = format!("{}:{}", self.collection, job_id);
116
117        // Persist to storage
118        let serialized =
119            bincode::serialize(&job).map_err(|e| AuroraError::SerializationError(e.to_string()))?;
120        self.storage.set(key, serialized)?;
121
122        // Update in-memory index
123        self.jobs.insert(job_id.to_string(), job);
124
125        Ok(())
126    }
127
128    /// Get job by ID
129    pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
130        Ok(self.jobs.get(job_id).map(|j| j.clone()))
131    }
132
133    /// Get job status
134    pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
135        Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
136    }
137
138    /// Remove completed jobs (cleanup)
139    pub async fn cleanup_completed(&self) -> Result<usize> {
140        let mut removed = 0;
141
142        let to_remove: Vec<String> = self
143            .jobs
144            .iter()
145            .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
146            .map(|entry| entry.key().clone())
147            .collect();
148
149        for job_id in to_remove {
150            let key = format!("{}:{}", self.collection, job_id);
151            self.storage.delete(&key)?;
152            self.jobs.remove(&job_id);
153            removed += 1;
154        }
155
156        Ok(removed)
157    }
158
159    /// Get queue statistics
160    pub async fn stats(&self) -> Result<super::QueueStats> {
161        let mut stats = super::QueueStats {
162            pending: 0,
163            running: 0,
164            completed: 0,
165            failed: 0,
166            dead_letter: 0,
167        };
168
169        for entry in self.jobs.iter() {
170            match &entry.value().status {
171                JobStatus::Pending => stats.pending += 1,
172                JobStatus::Running => stats.running += 1,
173                JobStatus::Completed => stats.completed += 1,
174                JobStatus::Failed { .. } => stats.failed += 1,
175                JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
176            }
177        }
178
179        Ok(stats)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use crate::workers::job::{Job, JobPriority};
187    use tempfile::TempDir;
188
189    #[tokio::test]
190    async fn test_queue_enqueue_dequeue() {
191        let temp_dir = TempDir::new().unwrap();
192        let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
193
194        let job = Job::new("test");
195        let job_id = queue.enqueue(job).await.unwrap();
196
197        let dequeued = queue.dequeue().await.unwrap();
198        assert!(dequeued.is_some());
199        assert_eq!(dequeued.unwrap().id, job_id);
200    }
201
202    #[tokio::test]
203    async fn test_queue_priority_ordering() {
204        let temp_dir = TempDir::new().unwrap();
205        let queue = JobQueue::new(temp_dir.path().to_str().unwrap().to_string()).unwrap();
206
207        queue
208            .enqueue(Job::new("low").with_priority(JobPriority::Low))
209            .await
210            .unwrap();
211        queue
212            .enqueue(Job::new("high").with_priority(JobPriority::High))
213            .await
214            .unwrap();
215        queue
216            .enqueue(Job::new("critical").with_priority(JobPriority::Critical))
217            .await
218            .unwrap();
219
220        let first = queue.dequeue().await.unwrap().unwrap();
221        assert_eq!(first.job_type, "critical");
222
223        let second = queue.dequeue().await.unwrap().unwrap();
224        assert_eq!(second.job_type, "high");
225
226        let third = queue.dequeue().await.unwrap().unwrap();
227        assert_eq!(third.job_type, "low");
228    }
229
230    #[tokio::test]
231    async fn test_queue_persistence() {
232        let temp_dir = TempDir::new().unwrap();
233        let path = temp_dir.path().to_str().unwrap().to_string();
234
235        // Create queue and add job
236        {
237            let queue = JobQueue::new(path.clone()).unwrap();
238            queue.enqueue(Job::new("persistent")).await.unwrap();
239        }
240
241        // Reopen queue and verify job is still there
242        {
243            let queue = JobQueue::new(path).unwrap();
244            let stats = queue.stats().await.unwrap();
245            assert_eq!(stats.pending, 1);
246        }
247    }
248}