1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5use crate::job::{JobWrapper, JobStatus};
6use crate::stats::StatsTracker;
7use crate::Result;
8
9#[async_trait]
11pub trait QueueBackend: Send + Sync {
12 async fn enqueue(&self, job: JobWrapper) -> Result<()>;
13 async fn dequeue(&self) -> Result<Option<JobWrapper>>;
14 async fn complete(&self, job_id: &str) -> Result<()>;
15 async fn fail(&self, job_id: &str, error: String) -> Result<()>;
16 async fn retry(&self, job: JobWrapper) -> Result<()>;
17 async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()>;
18 async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>>;
19 async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()>;
20 async fn clear(&self) -> Result<()>;
21}
22
23pub struct MemoryBackend {
25 queue: Arc<Mutex<VecDeque<JobWrapper>>>,
26 dead_letter: Arc<Mutex<Vec<JobWrapper>>>,
27}
28
29impl MemoryBackend {
30 pub fn new() -> Self {
31 Self {
32 queue: Arc::new(Mutex::new(VecDeque::new())),
33 dead_letter: Arc::new(Mutex::new(Vec::new())),
34 }
35 }
36}
37
38impl Default for MemoryBackend {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44#[async_trait]
45impl QueueBackend for MemoryBackend {
46 async fn enqueue(&self, mut job: JobWrapper) -> Result<()> {
47 job.status = JobStatus::Pending;
48 let mut queue = self.queue.lock().await;
49
50 let pos = queue.iter().position(|j| j.priority < job.priority)
52 .unwrap_or(queue.len());
53 queue.insert(pos, job);
54
55 Ok(())
56 }
57
58 async fn dequeue(&self) -> Result<Option<JobWrapper>> {
59 let mut queue = self.queue.lock().await;
60
61 let now = chrono::Utc::now().timestamp();
63 let pos = queue.iter().position(|j| {
64 j.status == JobStatus::Pending &&
65 j.scheduled_at.map(|t| t <= now).unwrap_or(true)
66 });
67
68 if let Some(pos) = pos {
69 let mut job = queue.remove(pos).unwrap();
70 job.status = JobStatus::Running;
71 job.attempts += 1;
72 Ok(Some(job))
73 } else {
74 Ok(None)
75 }
76 }
77
78 async fn complete(&self, _job_id: &str) -> Result<()> {
79 Ok(())
81 }
82
83 async fn fail(&self, _job_id: &str, _error: String) -> Result<()> {
84 Ok(())
86 }
87
88 async fn retry(&self, job: JobWrapper) -> Result<()> {
89 self.enqueue(job).await
90 }
91
92 async fn move_to_dead_letter(&self, mut job: JobWrapper) -> Result<()> {
93 job.status = JobStatus::DeadLetter;
94 let mut dlq = self.dead_letter.lock().await;
95 dlq.push(job);
96 Ok(())
97 }
98
99 async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
100 let dlq = self.dead_letter.lock().await;
101 Ok(dlq.clone())
102 }
103
104 async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
105 let mut dlq = self.dead_letter.lock().await;
106 if let Some(pos) = dlq.iter().position(|j| j.id == job_id) {
107 let mut job = dlq.remove(pos);
108 job.status = JobStatus::Pending;
109 job.attempts = 0;
110 job.error = None;
111 drop(dlq); self.enqueue(job).await?;
113 }
114 Ok(())
115 }
116
117 async fn clear(&self) -> Result<()> {
118 let mut queue = self.queue.lock().await;
119 queue.clear();
120 Ok(())
121 }
122}
123
124pub struct Queue {
126 backend: Arc<dyn QueueBackend>,
127 stats: StatsTracker,
128}
129
130impl Queue {
131 pub fn new(backend: Arc<dyn QueueBackend>) -> Self {
132 Self {
133 backend,
134 stats: StatsTracker::new(),
135 }
136 }
137
138 pub fn memory() -> Self {
139 Self::new(Arc::new(MemoryBackend::new()))
140 }
141
142 pub async fn enqueue(&self, job: JobWrapper) -> Result<String> {
143 let job_id = job.id.clone();
144 self.backend.enqueue(job).await?;
145 self.stats.increment_enqueued().await;
146 Ok(job_id)
147 }
148
149 pub async fn dequeue(&self) -> Result<Option<JobWrapper>> {
150 let job = self.backend.dequeue().await?;
151 if job.is_some() {
152 self.stats.mark_running().await;
153 }
154 Ok(job)
155 }
156
157 pub async fn complete(&self, job_id: &str) -> Result<()> {
158 self.backend.complete(job_id).await?;
159 self.stats.increment_processed().await;
160 Ok(())
161 }
162
163 pub async fn fail(&self, job_id: &str, error: String) -> Result<()> {
164 self.backend.fail(job_id, error).await?;
165 self.stats.increment_failed().await;
166 Ok(())
167 }
168
169 pub async fn retry(&self, job: JobWrapper) -> Result<()> {
170 self.backend.retry(job).await?;
171 self.stats.increment_retried().await;
172 Ok(())
173 }
174
175 pub async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
176 self.backend.move_to_dead_letter(job).await?;
177 self.stats.increment_dead_letter().await;
178 Ok(())
179 }
180
181 pub async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
182 self.backend.list_dead_letter().await
183 }
184
185 pub async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
186 self.backend.retry_from_dead_letter(job_id).await
187 }
188
189 pub async fn clear(&self) -> Result<()> {
190 self.backend.clear().await?;
191 self.stats.reset().await;
192 Ok(())
193 }
194
195 pub async fn get_stats(&self) -> crate::stats::QueueStats {
196 self.stats.get_stats().await
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use crate::job::Job;
204 use serde::{Deserialize, Serialize};
205
206 #[derive(Serialize, Deserialize)]
207 struct TestJob {
208 value: i32,
209 }
210
211 #[async_trait::async_trait]
212 impl Job for TestJob {
213 async fn perform(&self) -> crate::Result<()> {
214 Ok(())
215 }
216 }
217
218 #[tokio::test]
219 async fn test_enqueue_dequeue() {
220 let queue = Queue::memory();
221 let job = JobWrapper::new(&TestJob { value: 42 }).unwrap();
222
223 queue.enqueue(job).await.unwrap();
224 let dequeued = queue.dequeue().await.unwrap();
225
226 assert!(dequeued.is_some());
227 }
228}