Skip to main content

oxidite_queue/
queue.rs

1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5use crate::job::{JobWrapper, JobStatus};
6use crate::stats::StatsTracker;
7use crate::Result;
8
9/// Queue backend trait
10#[async_trait]
11pub trait QueueBackend: Send + Sync {
12    async fn enqueue(&self, job: JobWrapper) -> Result<()>;
13    async fn dequeue(&self) -> Result<Option<JobWrapper>>;
14    async fn complete(&self, job_id: &str) -> Result<()>;
15    async fn fail(&self, job_id: &str, error: String) -> Result<()>;
16    async fn retry(&self, job: JobWrapper) -> Result<()>;
17    async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()>;
18    async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>>;
19    async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()>;
20}
21
22/// In-memory queue backend
23pub struct MemoryBackend {
24    queue: Arc<Mutex<VecDeque<JobWrapper>>>,
25    dead_letter: Arc<Mutex<Vec<JobWrapper>>>,
26}
27
28impl MemoryBackend {
29    pub fn new() -> Self {
30        Self {
31            queue: Arc::new(Mutex::new(VecDeque::new())),
32            dead_letter: Arc::new(Mutex::new(Vec::new())),
33        }
34    }
35}
36
37impl Default for MemoryBackend {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43#[async_trait]
44impl QueueBackend for MemoryBackend {
45    async fn enqueue(&self, mut job: JobWrapper) -> Result<()> {
46        job.status = JobStatus::Pending;
47        let mut queue = self.queue.lock().await;
48        
49        // Insert based on priority (higher priority first)
50        let pos = queue.iter().position(|j| j.priority < job.priority)
51            .unwrap_or(queue.len());
52        queue.insert(pos, job);
53        
54        Ok(())
55    }
56
57    async fn dequeue(&self) -> Result<Option<JobWrapper>> {
58        let mut queue = self.queue.lock().await;
59        
60        // Find first job that can be run now
61        let now = chrono::Utc::now().timestamp();
62        let pos = queue.iter().position(|j| {
63            j.status == JobStatus::Pending &&
64            j.scheduled_at.map(|t| t <= now).unwrap_or(true)
65        });
66
67        if let Some(pos) = pos {
68            let mut job = queue.remove(pos).unwrap();
69            job.status = JobStatus::Running;
70            job.attempts += 1;
71            Ok(Some(job))
72        } else {
73            Ok(None)
74        }
75    }
76
77    async fn complete(&self, _job_id: &str) -> Result<()> {
78        // In memory backend doesn't need to track completed jobs
79        Ok(())
80    }
81
82    async fn fail(&self, _job_id: &str, _error: String) -> Result<()> {
83        // In memory backend doesn't need to track failed jobs
84        Ok(())
85    }
86
87    async fn retry(&self, job: JobWrapper) -> Result<()> {
88        self.enqueue(job).await
89    }
90
91    async fn move_to_dead_letter(&self, mut job: JobWrapper) -> Result<()> {
92        job.status = JobStatus::DeadLetter;
93        let mut dlq = self.dead_letter.lock().await;
94        dlq.push(job);
95        Ok(())
96    }
97
98    async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
99        let dlq = self.dead_letter.lock().await;
100        Ok(dlq.clone())
101    }
102
103    async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
104        let mut dlq = self.dead_letter.lock().await;
105        if let Some(pos) = dlq.iter().position(|j| j.id == job_id) {
106            let mut job = dlq.remove(pos);
107            job.status = JobStatus::Pending;
108            job.attempts = 0;
109            job.error = None;
110            drop(dlq); // Release lock before enqueue
111            self.enqueue(job).await?;
112        }
113        Ok(())
114    }
115}
116
117/// Queue for managing jobs
118pub struct Queue {
119    backend: Arc<dyn QueueBackend>,
120    stats: StatsTracker,
121}
122
123impl Queue {
124    pub fn new(backend: Arc<dyn QueueBackend>) -> Self {
125        Self {
126            backend,
127            stats: StatsTracker::new(),
128        }
129    }
130
131    pub fn memory() -> Self {
132        Self::new(Arc::new(MemoryBackend::new()))
133    }
134
135    pub async fn enqueue(&self, job: JobWrapper) -> Result<String> {
136        let job_id = job.id.clone();
137        self.backend.enqueue(job).await?;
138        self.stats.increment_enqueued().await;
139        Ok(job_id)
140    }
141
142    pub async fn dequeue(&self) -> Result<Option<JobWrapper>> {
143        let job = self.backend.dequeue().await?;
144        if job.is_some() {
145            self.stats.mark_running().await;
146        }
147        Ok(job)
148    }
149
150    pub async fn complete(&self, job_id: &str) -> Result<()> {
151        self.backend.complete(job_id).await?;
152        self.stats.increment_processed().await;
153        Ok(())
154    }
155
156    pub async fn fail(&self, job_id: &str, error: String) -> Result<()> {
157        self.backend.fail(job_id, error).await?;
158        self.stats.increment_failed().await;
159        Ok(())
160    }
161
162    pub async fn retry(&self, job: JobWrapper) -> Result<()> {
163        self.backend.retry(job).await?;
164        self.stats.increment_retried().await;
165        Ok(())
166    }
167
168    pub async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
169        self.backend.move_to_dead_letter(job).await?;
170        self.stats.increment_dead_letter().await;
171        Ok(())
172    }
173
174    pub async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
175        self.backend.list_dead_letter().await
176    }
177
178    pub async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
179        self.backend.retry_from_dead_letter(job_id).await
180    }
181
182    pub async fn get_stats(&self) -> crate::stats::QueueStats {
183        self.stats.get_stats().await
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use crate::job::Job;
191    use serde::{Deserialize, Serialize};
192
193    #[derive(Serialize, Deserialize)]
194    struct TestJob {
195        value: i32,
196    }
197
198    #[async_trait::async_trait]
199    impl Job for TestJob {
200        async fn perform(&self) -> crate::Result<()> {
201            Ok(())
202        }
203    }
204
205    #[tokio::test]
206    async fn test_enqueue_dequeue() {
207        let queue = Queue::memory();
208        let job = JobWrapper::new(&TestJob { value: 42 }).unwrap();
209        
210        queue.enqueue(job).await.unwrap();
211        let dequeued = queue.dequeue().await.unwrap();
212        
213        assert!(dequeued.is_some());
214    }
215}