Skip to main content

aurora_db/workers/
queue.rs

1use super::job::{Job, JobStatus};
2use crate::error::{AqlError, ErrorCode, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6use tokio::sync::mpsc;
7
8/// Persistent job queue
9pub struct JobQueue {
10    // In-memory index for all jobs (status tracking)
11    jobs: Arc<DashMap<String, Job>>,
12    // Fast-path channel for pending jobs (avoid scanning)
13    pending_tx: mpsc::UnboundedSender<String>,
14    pending_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<String>>>,
15    // Persistent storage
16    storage: Arc<ColdStore>,
17    // Collection name for jobs
18    collection: String,
19}
20
21impl JobQueue {
22    pub fn new(storage_path: String) -> Result<Self> {
23        let storage = Arc::new(ColdStore::new(&storage_path)?);
24        let jobs = Arc::new(DashMap::new());
25        let (pending_tx, pending_rx) = mpsc::unbounded_channel();
26
27        let queue = Self {
28            jobs,
29            pending_tx,
30            pending_rx: Arc::new(tokio::sync::Mutex::new(pending_rx)),
31            storage,
32            collection: "__aurora_jobs".to_string(),
33        };
34
35        // Load existing jobs from storage
36        queue.load_jobs()?;
37
38        Ok(queue)
39    }
40
41    /// Load jobs from persistent storage into memory
42    fn load_jobs(&self) -> Result<()> {
43        let prefix = format!("{}:", self.collection);
44
45        for entry in self.storage.scan_prefix(&prefix) {
46            if let Ok((key, value)) = entry
47                && let Ok(job) = bincode::deserialize::<Job>(&value)
48            {
49                if !matches!(job.status, JobStatus::Completed) {
50                    let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
51                    self.jobs.insert(job_id.clone(), job);
52
53                    // Re-enqueue pending jobs into fast-path
54                    let _ = self.pending_tx.send(job_id);
55                }
56            }
57        }
58
59        Ok(())
60    }
61
62    /// Enqueue a new job
63    pub async fn enqueue(&self, job: Job) -> Result<String> {
64        let job_id = job.id.clone();
65        let key = format!("{}:{}", self.collection, job_id);
66
67        // 1. ATOMIC FAST-PATH: Add to in-memory index immediately
68        self.jobs.insert(job_id.clone(), job.clone());
69
70        // 2. PERSISTENCE: Write to storage (Crucial for durability)
71        // Optimization: In high-throughput scenarios, we could batch these.
72        let serialized = bincode::serialize(&job)
73            .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
74        self.storage.set(key, serialized)?;
75
76        // 3. NOTIFY: Wake up waiting workers instantly via channel
77        let _ = self.pending_tx.send(job_id.clone());
78
79        Ok(job_id)
80    }
81
82    /// Dequeue next job (O(1) via channel instead of O(N) scan)
83    pub async fn dequeue(&self) -> Result<Option<Job>> {
84        let mut rx = self.pending_rx.lock().await;
85
86        while let Some(job_id) = rx.recv().await {
87            // Check for shutdown sentinel
88            if job_id == "__SHUTDOWN__" {
89                return Ok(None);
90            }
91
92            // Check if job is still valid and pending
93            if let Some(mut job) = self.jobs.get_mut(&job_id) {
94                if matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. })
95                    && job.should_run()
96                {
97                    // Mark as running
98                    job.mark_running();
99                    let job_clone = job.clone();
100                    drop(job);
101
102                    // Persist state change (Pending -> Running)
103                    self.update_job(&job_id, job_clone.clone()).await?;
104
105                    return Ok(Some(job_clone));
106                }
107            }
108        }
109
110        Ok(None)
111    }
112
113    /// Update job status
114    pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
115        let key = format!("{}:{}", self.collection, job_id);
116
117        // Update in-memory index first (O(1))
118        self.jobs.insert(job_id.to_string(), job.clone());
119
120        // Persist to storage
121        let serialized = bincode::serialize(&job)
122            .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
123        self.storage.set(key, serialized)?;
124
125        Ok(())
126    }
127
128    /// Shutdown the queue (wakes up all waiting workers)
129    pub async fn shutdown(&self) {
130        // We use a sentinel string because the receiver is locked in dequeue().
131        // Sending this will wake up the worker waiting on rx.recv().await.
132        let _ = self.pending_tx.send("__SHUTDOWN__".to_string());
133    }
134
135    // Dummy method for backward compatibility with executor.rs notify calls
136    pub fn notify_all(&self) {}
137    pub async fn notified(&self) {
138        // Not used anymore due to channel-based dequeue
139    }
140
141    /// Get job by ID
142    pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
143        Ok(self.jobs.get(job_id).map(|j| j.clone()))
144    }
145
146    /// Get job status
147    pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
148        Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
149    }
150
151    /// Remove completed jobs (cleanup)
152    pub async fn cleanup_completed(&self) -> Result<usize> {
153        let mut removed = 0;
154        let to_remove: Vec<String> = self
155            .jobs
156            .iter()
157            .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
158            .map(|entry| entry.key().clone())
159            .collect();
160
161        for job_id in to_remove {
162            let key = format!("{}:{}", self.collection, job_id);
163            let _ = self.storage.delete(&key);
164            self.jobs.remove(&job_id);
165            removed += 1;
166        }
167        Ok(removed)
168    }
169
170    /// Get queue statistics
171    pub async fn stats(&self) -> Result<super::QueueStats> {
172        let mut stats = super::QueueStats {
173            pending: 0,
174            running: 0,
175            completed: 0,
176            failed: 0,
177            dead_letter: 0,
178        };
179        for entry in self.jobs.iter() {
180            match &entry.value().status {
181                JobStatus::Pending => stats.pending += 1,
182                JobStatus::Running => stats.running += 1,
183                JobStatus::Completed => stats.completed += 1,
184                JobStatus::Failed { .. } => stats.failed += 1,
185                JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
186            }
187        }
188        Ok(stats)
189    }
190
191    /// Find zombie jobs
192    pub async fn find_zombie_jobs(&self) -> Vec<String> {
193        self.jobs
194            .iter()
195            .filter(|entry| entry.value().is_heartbeat_expired())
196            .map(|entry| entry.key().clone())
197            .collect()
198    }
199}