1use async_trait::async_trait;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tokio::sync::Mutex;
5use crate::job::{JobWrapper, JobStatus};
6use crate::stats::StatsTracker;
7use crate::Result;
8
9#[async_trait]
11pub trait QueueBackend: Send + Sync {
12 async fn enqueue(&self, job: JobWrapper) -> Result<()>;
13 async fn dequeue(&self) -> Result<Option<JobWrapper>>;
14 async fn complete(&self, job_id: &str) -> Result<()>;
15 async fn fail(&self, job_id: &str, error: String) -> Result<()>;
16 async fn retry(&self, job: JobWrapper) -> Result<()>;
17 async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()>;
18 async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>>;
19 async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()>;
20}
21
22pub struct MemoryBackend {
24 queue: Arc<Mutex<VecDeque<JobWrapper>>>,
25 dead_letter: Arc<Mutex<Vec<JobWrapper>>>,
26}
27
28impl MemoryBackend {
29 pub fn new() -> Self {
30 Self {
31 queue: Arc::new(Mutex::new(VecDeque::new())),
32 dead_letter: Arc::new(Mutex::new(Vec::new())),
33 }
34 }
35}
36
37impl Default for MemoryBackend {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43#[async_trait]
44impl QueueBackend for MemoryBackend {
45 async fn enqueue(&self, mut job: JobWrapper) -> Result<()> {
46 job.status = JobStatus::Pending;
47 let mut queue = self.queue.lock().await;
48
49 let pos = queue.iter().position(|j| j.priority < job.priority)
51 .unwrap_or(queue.len());
52 queue.insert(pos, job);
53
54 Ok(())
55 }
56
57 async fn dequeue(&self) -> Result<Option<JobWrapper>> {
58 let mut queue = self.queue.lock().await;
59
60 let now = chrono::Utc::now().timestamp();
62 let pos = queue.iter().position(|j| {
63 j.status == JobStatus::Pending &&
64 j.scheduled_at.map(|t| t <= now).unwrap_or(true)
65 });
66
67 if let Some(pos) = pos {
68 let mut job = queue.remove(pos).unwrap();
69 job.status = JobStatus::Running;
70 job.attempts += 1;
71 Ok(Some(job))
72 } else {
73 Ok(None)
74 }
75 }
76
77 async fn complete(&self, _job_id: &str) -> Result<()> {
78 Ok(())
80 }
81
82 async fn fail(&self, _job_id: &str, _error: String) -> Result<()> {
83 Ok(())
85 }
86
87 async fn retry(&self, job: JobWrapper) -> Result<()> {
88 self.enqueue(job).await
89 }
90
91 async fn move_to_dead_letter(&self, mut job: JobWrapper) -> Result<()> {
92 job.status = JobStatus::DeadLetter;
93 let mut dlq = self.dead_letter.lock().await;
94 dlq.push(job);
95 Ok(())
96 }
97
98 async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
99 let dlq = self.dead_letter.lock().await;
100 Ok(dlq.clone())
101 }
102
103 async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
104 let mut dlq = self.dead_letter.lock().await;
105 if let Some(pos) = dlq.iter().position(|j| j.id == job_id) {
106 let mut job = dlq.remove(pos);
107 job.status = JobStatus::Pending;
108 job.attempts = 0;
109 job.error = None;
110 drop(dlq); self.enqueue(job).await?;
112 }
113 Ok(())
114 }
115}
116
117pub struct Queue {
119 backend: Arc<dyn QueueBackend>,
120 stats: StatsTracker,
121}
122
123impl Queue {
124 pub fn new(backend: Arc<dyn QueueBackend>) -> Self {
125 Self {
126 backend,
127 stats: StatsTracker::new(),
128 }
129 }
130
131 pub fn memory() -> Self {
132 Self::new(Arc::new(MemoryBackend::new()))
133 }
134
135 pub async fn enqueue(&self, job: JobWrapper) -> Result<String> {
136 let job_id = job.id.clone();
137 self.backend.enqueue(job).await?;
138 self.stats.increment_enqueued().await;
139 Ok(job_id)
140 }
141
142 pub async fn dequeue(&self) -> Result<Option<JobWrapper>> {
143 let job = self.backend.dequeue().await?;
144 if job.is_some() {
145 self.stats.mark_running().await;
146 }
147 Ok(job)
148 }
149
150 pub async fn complete(&self, job_id: &str) -> Result<()> {
151 self.backend.complete(job_id).await?;
152 self.stats.increment_processed().await;
153 Ok(())
154 }
155
156 pub async fn fail(&self, job_id: &str, error: String) -> Result<()> {
157 self.backend.fail(job_id, error).await?;
158 self.stats.increment_failed().await;
159 Ok(())
160 }
161
162 pub async fn retry(&self, job: JobWrapper) -> Result<()> {
163 self.backend.retry(job).await?;
164 self.stats.increment_retried().await;
165 Ok(())
166 }
167
168 pub async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
169 self.backend.move_to_dead_letter(job).await?;
170 self.stats.increment_dead_letter().await;
171 Ok(())
172 }
173
174 pub async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
175 self.backend.list_dead_letter().await
176 }
177
178 pub async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
179 self.backend.retry_from_dead_letter(job_id).await
180 }
181
182 pub async fn get_stats(&self) -> crate::stats::QueueStats {
183 self.stats.get_stats().await
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use crate::job::Job;
191 use serde::{Deserialize, Serialize};
192
193 #[derive(Serialize, Deserialize)]
194 struct TestJob {
195 value: i32,
196 }
197
198 #[async_trait::async_trait]
199 impl Job for TestJob {
200 async fn perform(&self) -> crate::Result<()> {
201 Ok(())
202 }
203 }
204
205 #[tokio::test]
206 async fn test_enqueue_dequeue() {
207 let queue = Queue::memory();
208 let job = JobWrapper::new(&TestJob { value: 42 }).unwrap();
209
210 queue.enqueue(job).await.unwrap();
211 let dequeued = queue.dequeue().await.unwrap();
212
213 assert!(dequeued.is_some());
214 }
215}