1use async_trait::async_trait;
4use chrono::{DateTime, Duration, Utc};
5use std::cmp::Ordering;
6use std::collections::{BinaryHeap, HashMap};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::backend::{QueueBackend, QueueError};
12use crate::job::{JobEntry, JobStatus};
13
14#[derive(Debug, Clone, Eq, PartialEq)]
16struct PriorityEntry {
17 run_at: DateTime<Utc>,
18 id: Uuid,
19}
20
21impl Ord for PriorityEntry {
22 fn cmp(&self, other: &Self) -> Ordering {
23 other.run_at.cmp(&self.run_at)
25 }
26}
27
28impl PartialOrd for PriorityEntry {
29 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
30 Some(self.cmp(other))
31 }
32}
33
34#[derive(Debug, Default)]
35pub struct MemoryQueue {
36 jobs: Arc<RwLock<HashMap<Uuid, JobEntry>>>,
37 queue: Arc<RwLock<BinaryHeap<PriorityEntry>>>,
38}
39
40impl MemoryQueue {
41 pub fn new() -> Self {
42 Self::default()
43 }
44}
45
46#[async_trait]
47impl QueueBackend for MemoryQueue {
48 async fn enqueue(
49 &self,
50 tenant_id: &str,
51 job_type: &str,
52 payload: serde_json::Value,
53 delay_secs: Option<u64>,
54 ) -> Result<Uuid, QueueError> {
55 let id = Uuid::new_v4();
56 let now = Utc::now();
57 let run_at = now + Duration::seconds(delay_secs.unwrap_or(0) as i64);
58
59 let entry = JobEntry {
60 id,
61 tenant_id: tenant_id.to_string(),
62 job_type: job_type.to_string(),
63 payload,
64 status: JobStatus::Pending,
65 created_at: now,
66 run_at,
67 attempts: 0,
68 last_error: None,
69 result: None,
70 };
71
72 let mut jobs = self.jobs.write().await;
73 jobs.insert(id, entry);
74
75 let mut queue = self.queue.write().await;
76 queue.push(PriorityEntry { run_at, id });
77
78 Ok(id)
79 }
80
81 async fn dequeue(&self) -> Result<Option<JobEntry>, QueueError> {
82 let mut queue = self.queue.write().await;
83 let now = Utc::now();
84
85 if let Some(entry) = queue.peek() {
87 if entry.run_at <= now {
88 let entry = queue.pop().unwrap();
90 let mut jobs = self.jobs.write().await;
91
92 if let Some(job) = jobs.get_mut(&entry.id) {
93 if job.status == JobStatus::Pending {
95 job.status = JobStatus::Running;
96 return Ok(Some(job.clone()));
97 }
98 }
99 }
100 }
101
102 Ok(None)
103 }
104
105 async fn update_status(
106 &self,
107 id: Uuid,
108 status: JobStatus,
109 error: Option<String>,
110 delay_secs: Option<u64>,
111 ) -> Result<(), QueueError> {
112 let mut jobs = self.jobs.write().await;
113
114 if let Some(job) = jobs.get_mut(&id) {
115 job.status = status;
116 job.last_error = error;
117 job.attempts += if matches!(status, JobStatus::Failed(_)) {
118 1
119 } else {
120 0
121 };
122
123 if let JobStatus::Failed(retry_count) = status {
124 let backoff_secs = delay_secs.unwrap_or_else(|| {
126 2_u64.pow(retry_count.min(6)) });
128 let run_at = Utc::now() + Duration::seconds(backoff_secs as i64);
129 job.run_at = run_at;
130 job.status = JobStatus::Pending; tracing::debug!(
133 job_id = %id,
134 retry_count = retry_count,
135 delay_secs = backoff_secs,
136 "Re-queuing job with backoff"
137 );
138
139 let mut queue = self.queue.write().await;
140 queue.push(PriorityEntry { run_at, id });
141 }
142 }
143
144 Ok(())
145 }
146
147 async fn get_status(&self, id: Uuid) -> Result<JobStatus, QueueError> {
148 let jobs = self.jobs.read().await;
149 jobs.get(&id).map(|j| j.status).ok_or(QueueError::NotFound)
150 }
151
152 async fn get_job(&self, id: Uuid) -> Result<JobEntry, QueueError> {
153 let jobs = self.jobs.read().await;
154 jobs.get(&id).cloned().ok_or(QueueError::NotFound)
155 }
156
157 async fn set_result(&self, id: Uuid, result: serde_json::Value) -> Result<(), QueueError> {
158 let mut jobs = self.jobs.write().await;
159 if let Some(job) = jobs.get_mut(&id) {
160 job.result = Some(result);
161 Ok(())
162 } else {
163 Err(QueueError::NotFound)
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use serde_json::json;
172
173 #[tokio::test]
174 async fn test_enqueue_dequeue() {
175 let queue = MemoryQueue::new();
176 let payload = json!({ "foo": "bar" });
177
178 let id = queue
180 .enqueue("test-tenant", "test_job", payload.clone(), None)
181 .await
182 .unwrap();
183
184 let status = queue.get_status(id).await.unwrap();
185 assert_eq!(status, JobStatus::Pending);
186
187 let job = queue.dequeue().await.unwrap().expect("Should have job");
189 assert_eq!(job.id, id);
190 assert_eq!(job.job_type, "test_job");
191 assert_eq!(job.status, JobStatus::Running);
192
193 let empty = queue.dequeue().await.unwrap();
195 assert!(empty.is_none());
196 }
197
198 #[tokio::test]
199 async fn test_delayed_job() {
200 let queue = MemoryQueue::new();
201 let payload = json!({});
202
203 let id = queue
205 .enqueue("test-tenant", "delayed", payload, Some(1))
206 .await
207 .unwrap();
208
209 let job = queue.dequeue().await.unwrap();
211 assert!(job.is_none());
212
213 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
215
216 let job = queue
218 .dequeue()
219 .await
220 .unwrap()
221 .expect("Should have delayed job");
222 assert_eq!(job.id, id);
223 }
224}