1use async_trait::async_trait;
4use chrono::{DateTime, Duration, Utc};
5use std::cmp::Ordering;
6use std::collections::{BinaryHeap, HashMap};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::backend::{QueueBackend, QueueError};
12use crate::job::{JobEntry, JobStatus};
13
14#[derive(Debug, Clone, Eq, PartialEq)]
16struct PriorityEntry {
17 run_at: DateTime<Utc>,
18 id: Uuid,
19}
20
21impl Ord for PriorityEntry {
22 fn cmp(&self, other: &Self) -> Ordering {
23 other.run_at.cmp(&self.run_at)
25 }
26}
27
28impl PartialOrd for PriorityEntry {
29 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
30 Some(self.cmp(other))
31 }
32}
33
34#[derive(Debug, Default)]
35pub struct MemoryQueue {
36 jobs: Arc<RwLock<HashMap<Uuid, JobEntry>>>,
37 queue: Arc<RwLock<BinaryHeap<PriorityEntry>>>,
38}
39
40impl MemoryQueue {
41 pub fn new() -> Self {
42 Self::default()
43 }
44}
45
46#[async_trait]
47impl QueueBackend for MemoryQueue {
48 async fn enqueue(
49 &self,
50 tenant_id: &str,
51 job_type: &str,
52 payload: serde_json::Value,
53 delay_secs: Option<u64>,
54 ) -> Result<Uuid, QueueError> {
55 let id = Uuid::new_v4();
56 let now = Utc::now();
57 let run_at = now + Duration::seconds(delay_secs.unwrap_or(0) as i64);
58
59 let entry = JobEntry {
60 id,
61 tenant_id: tenant_id.to_string(),
62 job_type: job_type.to_string(),
63 payload,
64 status: JobStatus::Pending,
65 created_at: now,
66 run_at,
67 attempts: 0,
68 last_error: None,
69 };
70
71 let mut jobs = self.jobs.write().await;
72 jobs.insert(id, entry);
73
74 let mut queue = self.queue.write().await;
75 queue.push(PriorityEntry { run_at, id });
76
77 Ok(id)
78 }
79
80 async fn dequeue(&self) -> Result<Option<JobEntry>, QueueError> {
81 let mut queue = self.queue.write().await;
82 let now = Utc::now();
83
84 if let Some(entry) = queue.peek() {
86 if entry.run_at <= now {
87 let entry = queue.pop().unwrap();
89 let mut jobs = self.jobs.write().await;
90
91 if let Some(job) = jobs.get_mut(&entry.id) {
92 if job.status == JobStatus::Pending {
94 job.status = JobStatus::Running;
95 return Ok(Some(job.clone()));
96 }
97 }
98 }
99 }
100
101 Ok(None)
102 }
103
104 async fn update_status(
105 &self,
106 id: Uuid,
107 status: JobStatus,
108 error: Option<String>,
109 delay_secs: Option<u64>,
110 ) -> Result<(), QueueError> {
111 let mut jobs = self.jobs.write().await;
112
113 if let Some(job) = jobs.get_mut(&id) {
114 job.status = status;
115 job.last_error = error;
116 job.attempts += if matches!(status, JobStatus::Failed(_)) {
117 1
118 } else {
119 0
120 };
121
122 if let JobStatus::Failed(retry_count) = status {
123 let backoff_secs = delay_secs.unwrap_or_else(|| {
125 2_u64.pow(retry_count.min(6)) });
127 let run_at = Utc::now() + Duration::seconds(backoff_secs as i64);
128 job.run_at = run_at;
129 job.status = JobStatus::Pending; tracing::debug!(
132 job_id = %id,
133 retry_count = retry_count,
134 delay_secs = backoff_secs,
135 "Re-queuing job with backoff"
136 );
137
138 let mut queue = self.queue.write().await;
139 queue.push(PriorityEntry { run_at, id });
140 }
141 }
142
143 Ok(())
144 }
145
146 async fn get_status(&self, id: Uuid) -> Result<JobStatus, QueueError> {
147 let jobs = self.jobs.read().await;
148 jobs.get(&id).map(|j| j.status).ok_or(QueueError::NotFound)
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use serde_json::json;
156
157 #[tokio::test]
158 async fn test_enqueue_dequeue() {
159 let queue = MemoryQueue::new();
160 let payload = json!({ "foo": "bar" });
161
162 let id = queue
164 .enqueue("test-tenant", "test_job", payload.clone(), None)
165 .await
166 .unwrap();
167
168 let status = queue.get_status(id).await.unwrap();
169 assert_eq!(status, JobStatus::Pending);
170
171 let job = queue.dequeue().await.unwrap().expect("Should have job");
173 assert_eq!(job.id, id);
174 assert_eq!(job.job_type, "test_job");
175 assert_eq!(job.status, JobStatus::Running);
176
177 let empty = queue.dequeue().await.unwrap();
179 assert!(empty.is_none());
180 }
181
182 #[tokio::test]
183 async fn test_delayed_job() {
184 let queue = MemoryQueue::new();
185 let payload = json!({});
186
187 let id = queue
189 .enqueue("test-tenant", "delayed", payload, Some(1))
190 .await
191 .unwrap();
192
193 let job = queue.dequeue().await.unwrap();
195 assert!(job.is_none());
196
197 tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
199
200 let job = queue
202 .dequeue()
203 .await
204 .unwrap()
205 .expect("Should have delayed job");
206 assert_eq!(job.id, id);
207 }
208}