1use async_trait::async_trait;
4use chrono::{DateTime, Duration, Utc};
5use std::cmp::Ordering;
6use std::collections::{BinaryHeap, HashMap};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::backend::{QueueBackend, QueueError};
12use crate::job::{JobEntry, JobStatus};
13
14#[derive(Debug, Clone, Eq, PartialEq)]
16struct PriorityEntry {
17 run_at: DateTime<Utc>,
18 id: Uuid,
19}
20
21impl Ord for PriorityEntry {
22 fn cmp(&self, other: &Self) -> Ordering {
23 other.run_at.cmp(&self.run_at)
25 }
26}
27
28impl PartialOrd for PriorityEntry {
29 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
30 Some(self.cmp(other))
31 }
32}
33
34#[derive(Debug, Default)]
35pub struct MemoryQueue {
36 jobs: Arc<RwLock<HashMap<Uuid, JobEntry>>>,
37 queue: Arc<RwLock<BinaryHeap<PriorityEntry>>>,
38}
39
40impl MemoryQueue {
41 pub fn new() -> Self {
42 Self::default()
43 }
44}
45
46#[async_trait]
47impl QueueBackend for MemoryQueue {
48 async fn enqueue(
49 &self,
50 tenant_id: &str,
51 job_type: &str,
52 payload: serde_json::Value,
53 delay_secs: Option<u64>,
54 ) -> Result<Uuid, QueueError> {
55 let id = Uuid::new_v4();
56 let now = Utc::now();
57 let run_at = now + Duration::seconds(delay_secs.unwrap_or(0) as i64);
58
59 let entry = JobEntry {
60 id,
61 tenant_id: tenant_id.to_string(),
62 job_type: job_type.to_string(),
63 payload,
64 status: JobStatus::Pending,
65 created_at: now,
66 run_at,
67 attempts: 0,
68 last_error: None,
69 result: None,
70 };
71
72 let mut jobs = self.jobs.write().await;
73 jobs.insert(id, entry);
74
75 let mut queue = self.queue.write().await;
76 queue.push(PriorityEntry { run_at, id });
77
78 Ok(id)
79 }
80
81 async fn dequeue(&self) -> Result<Option<JobEntry>, QueueError> {
82 let mut queue = self.queue.write().await;
83 let now = Utc::now();
84
85 if let Some(entry) = queue.peek() {
87 if entry.run_at <= now {
88 let entry = queue.pop().unwrap();
90 let mut jobs = self.jobs.write().await;
91
92 if let Some(job) = jobs.get_mut(&entry.id) {
93 if job.status == JobStatus::Pending {
95 job.status = JobStatus::Running;
96 return Ok(Some(job.clone()));
97 }
98 }
99 }
100 }
101
102 Ok(None)
103 }
104
105 async fn update_status(
106 &self,
107 id: Uuid,
108 status: JobStatus,
109 error: Option<String>,
110 delay_secs: Option<u64>,
111 ) -> Result<(), QueueError> {
112 let mut jobs = self.jobs.write().await;
113
114 if let Some(job) = jobs.get_mut(&id) {
115 job.status = status;
116 job.last_error = error;
117 job.attempts += if matches!(status, JobStatus::Failed(_)) {
118 1
119 } else {
120 0
121 };
122
123 if let JobStatus::Failed(retry_count) = status {
124 let backoff_secs = delay_secs.unwrap_or_else(|| {
126 2_u64.pow(retry_count.min(6)) });
128 let run_at = Utc::now() + Duration::seconds(backoff_secs as i64);
129 job.run_at = run_at;
130 job.status = JobStatus::Pending; tracing::debug!(
133 job_id = %id,
134 retry_count = retry_count,
135 delay_secs = backoff_secs,
136 "Re-queuing job with backoff"
137 );
138
139 let mut queue = self.queue.write().await;
140 queue.push(PriorityEntry { run_at, id });
141 }
142 }
143
144 Ok(())
145 }
146
147 async fn get_status(&self, tenant_id: &str, id: Uuid) -> Result<JobStatus, QueueError> {
148 let jobs = self.jobs.read().await;
149 let job = jobs.get(&id).ok_or(QueueError::NotFound)?;
150
151 if job.tenant_id != tenant_id {
152 return Err(QueueError::NotFound);
153 }
154
155 Ok(job.status)
156 }
157
158 async fn get_job(&self, tenant_id: &str, id: Uuid) -> Result<JobEntry, QueueError> {
159 let jobs = self.jobs.read().await;
160 let job = jobs.get(&id).ok_or(QueueError::NotFound)?;
161
162 if job.tenant_id != tenant_id {
163 return Err(QueueError::NotFound);
164 }
165
166 Ok(job.clone())
167 }
168
169 async fn set_result(&self, id: Uuid, result: serde_json::Value) -> Result<(), QueueError> {
170 let mut jobs = self.jobs.write().await;
171 if let Some(job) = jobs.get_mut(&id) {
172 job.result = Some(result);
173 Ok(())
174 } else {
175 Err(QueueError::NotFound)
176 }
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use serde_json::json;
184
185 #[tokio::test]
186 async fn test_enqueue_dequeue() {
187 let queue = MemoryQueue::new();
188 let payload = json!({ "foo": "bar" });
189
190 let id = queue
192 .enqueue("test-tenant", "test_job", payload.clone(), None)
193 .await
194 .unwrap();
195
196 let status = queue.get_status("test-tenant", id).await.unwrap();
197 assert_eq!(status, JobStatus::Pending);
198
199 let job = queue.dequeue().await.unwrap().expect("Should have job");
201 assert_eq!(job.id, id);
202 assert_eq!(job.job_type, "test_job");
203 assert_eq!(job.status, JobStatus::Running);
204
205 let empty = queue.dequeue().await.unwrap();
207 assert!(empty.is_none());
208 }
209
210 #[tokio::test]
211 async fn test_delayed_job() {
212 let queue = MemoryQueue::new();
213 let payload = json!({});
214
215 let id = queue
217 .enqueue("test-tenant", "delayed", payload, Some(1))
218 .await
219 .unwrap();
220
221 let job = queue.dequeue().await.unwrap();
223 assert!(job.is_none());
224
225 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
227
228 let job = queue
230 .dequeue()
231 .await
232 .unwrap()
233 .expect("Should have delayed job");
234 assert_eq!(job.id, id);
235 }
236}