Skip to main content

aurora_db/workers/
queue.rs

1use super::job::{Job, JobStatus};
2use crate::error::{AqlError, ErrorCode, Result};
3use crate::storage::ColdStore;
4use dashmap::DashMap;
5use std::sync::Arc;
6use tokio::sync::mpsc;
7
8/// Persistent job queue
9pub struct JobQueue {
10    // In-memory index for all jobs (status tracking)
11    jobs: Arc<DashMap<String, Job>>,
12    // Fast-path channel for pending jobs (avoid scanning)
13    pending_tx: mpsc::UnboundedSender<String>,
14    pending_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<String>>>,
15    // Persistent storage
16    storage: Arc<ColdStore>,
17    // Collection name for jobs
18    collection: String,
19}
20
21impl JobQueue {
22    pub fn new(storage_path: String) -> Result<Self> {
23        let storage = Arc::new(ColdStore::new(&storage_path)?);
24        let jobs = Arc::new(DashMap::new());
25        let (pending_tx, pending_rx) = mpsc::unbounded_channel();
26
27        let queue = Self {
28            jobs,
29            pending_tx,
30            pending_rx: Arc::new(tokio::sync::Mutex::new(pending_rx)),
31            storage,
32            collection: "__aurora_jobs".to_string(),
33        };
34
35        // Load existing jobs from storage
36        queue.load_jobs()?;
37
38        Ok(queue)
39    }
40
41    /// Load jobs from persistent storage into memory
42    fn load_jobs(&self) -> Result<()> {
43        let prefix = format!("{}:", self.collection);
44
45        for entry in self.storage.scan_prefix(&prefix) {
46            if let Ok((key, value)) = entry
47                && let Ok(job) = bincode::deserialize::<Job>(&value)
48            {
49                if !matches!(job.status, JobStatus::Completed) {
50                    let job_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
51                    self.jobs.insert(job_id.clone(), job);
52                    
53                    // Re-enqueue pending jobs into fast-path
54                    let _ = self.pending_tx.send(job_id);
55                }
56            }
57        }
58
59        Ok(())
60    }
61
62    /// Enqueue a new job
63    pub async fn enqueue(&self, job: Job) -> Result<String> {
64        let job_id = job.id.clone();
65        let key = format!("{}:{}", self.collection, job_id);
66
67        // 1. ATOMIC FAST-PATH: Add to in-memory index immediately
68        self.jobs.insert(job_id.clone(), job.clone());
69
70        // 2. PERSISTENCE: Write to storage (Crucial for durability)
71        // Optimization: In high-throughput scenarios, we could batch these.
72        let serialized = bincode::serialize(&job)
73            .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
74        self.storage.set(key, serialized)?;
75
76        // 3. NOTIFY: Wake up waiting workers instantly via channel
77        let _ = self.pending_tx.send(job_id.clone());
78
79        Ok(job_id)
80    }
81
82    /// Dequeue next job (O(1) via channel instead of O(N) scan)
83    pub async fn dequeue(&self) -> Result<Option<Job>> {
84        let mut rx = self.pending_rx.lock().await;
85        
86        while let Some(job_id) = rx.recv().await {
87            // Check for shutdown sentinel
88            if job_id == "__SHUTDOWN__" {
89                return Ok(None);
90            }
91
92            // Check if job is still valid and pending
93            if let Some(mut job) = self.jobs.get_mut(&job_id) {
94                if matches!(job.status, JobStatus::Pending | JobStatus::Failed { .. }) && job.should_run() {
95                    // Mark as running
96                    job.mark_running();
97                    let job_clone = job.clone();
98                    drop(job);
99                    
100                    // Persist state change (Pending -> Running)
101                    self.update_job(&job_id, job_clone.clone()).await?;
102                    
103                    return Ok(Some(job_clone));
104                }
105            }
106        }
107        
108        Ok(None)
109    }
110
111    /// Update job status
112    pub async fn update_job(&self, job_id: &str, job: Job) -> Result<()> {
113        let key = format!("{}:{}", self.collection, job_id);
114
115        // Update in-memory index first (O(1))
116        self.jobs.insert(job_id.to_string(), job.clone());
117
118        // Persist to storage
119        let serialized = bincode::serialize(&job)
120            .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
121        self.storage.set(key, serialized)?;
122
123        Ok(())
124    }
125
126    /// Shutdown the queue (wakes up all waiting workers)
127    pub async fn shutdown(&self) {
128        // We use a sentinel string because the receiver is locked in dequeue().
129        // Sending this will wake up the worker waiting on rx.recv().await.
130        let _ = self.pending_tx.send("__SHUTDOWN__".to_string());
131    }
132
133    // Dummy method for backward compatibility with executor.rs notify calls
134    pub fn notify_all(&self) {}
135    pub async fn notified(&self) {
136        // Not used anymore due to channel-based dequeue
137    }
138
139    /// Get job by ID
140    pub async fn get(&self, job_id: &str) -> Result<Option<Job>> {
141        Ok(self.jobs.get(job_id).map(|j| j.clone()))
142    }
143
144    /// Get job status
145    pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
146        Ok(self.jobs.get(job_id).map(|j| j.status.clone()))
147    }
148
149    /// Remove completed jobs (cleanup)
150    pub async fn cleanup_completed(&self) -> Result<usize> {
151        let mut removed = 0;
152        let to_remove: Vec<String> = self.jobs.iter()
153            .filter(|entry| matches!(entry.value().status, JobStatus::Completed))
154            .map(|entry| entry.key().clone())
155            .collect();
156
157        for job_id in to_remove {
158            let key = format!("{}:{}", self.collection, job_id);
159            let _ = self.storage.delete(&key);
160            self.jobs.remove(&job_id);
161            removed += 1;
162        }
163        Ok(removed)
164    }
165
166    /// Get queue statistics
167    pub async fn stats(&self) -> Result<super::QueueStats> {
168        let mut stats = super::QueueStats {
169            pending: 0, running: 0, completed: 0, failed: 0, dead_letter: 0,
170        };
171        for entry in self.jobs.iter() {
172            match &entry.value().status {
173                JobStatus::Pending => stats.pending += 1,
174                JobStatus::Running => stats.running += 1,
175                JobStatus::Completed => stats.completed += 1,
176                JobStatus::Failed { .. } => stats.failed += 1,
177                JobStatus::DeadLetter { .. } => stats.dead_letter += 1,
178            }
179        }
180        Ok(stats)
181    }
182
183    /// Find zombie jobs
184    pub async fn find_zombie_jobs(&self) -> Vec<String> {
185        self.jobs.iter()
186            .filter(|entry| entry.value().is_heartbeat_expired())
187            .map(|entry| entry.key().clone())
188            .collect()
189    }
190}