oxidite_queue/
postgres.rs1use async_trait::async_trait;
2use sqlx::{PgPool, Row};
3use crate::{QueueBackend, job::JobWrapper, Result, QueueError};
4
5pub struct PostgresBackend {
7 pool: PgPool,
8 table_name: String,
9 dlq_table_name: String,
10}
11
12impl PostgresBackend {
13 pub async fn new(pool: PgPool, table_name: &str) -> Result<Self> {
14 let backend = Self {
15 pool,
16 table_name: table_name.to_string(),
17 dlq_table_name: format!("{}_dlq", table_name),
18 };
19
20 backend.init_tables().await?;
22
23 Ok(backend)
24 }
25
26 async fn init_tables(&self) -> Result<()> {
27 sqlx::query(&format!(
29 r#"CREATE TABLE IF NOT EXISTS {} (
30 id TEXT PRIMARY KEY,
31 payload JSONB NOT NULL,
32 priority INTEGER DEFAULT 0,
33 attempts INTEGER DEFAULT 0,
34 max_attempts INTEGER DEFAULT 3,
35 scheduled_at TIMESTAMP WITH TIME ZONE,
36 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
37 updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
38 status VARCHAR(20) DEFAULT 'pending'
39 )"#,
40 self.table_name
41 ))
42 .execute(&self.pool)
43 .await
44 .map_err(|e| QueueError::BackendError(e.to_string()))?;
45
46 sqlx::query(&format!(
48 r#"CREATE TABLE IF NOT EXISTS {} (
49 id TEXT PRIMARY KEY,
50 payload JSONB NOT NULL,
51 priority INTEGER DEFAULT 0,
52 attempts INTEGER DEFAULT 0,
53 error TEXT,
54 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
55 updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
56 status VARCHAR(20) DEFAULT 'dead_letter'
57 )"#,
58 self.dlq_table_name
59 ))
60 .execute(&self.pool)
61 .await
62 .map_err(|e| QueueError::BackendError(e.to_string()))?;
63
64 Ok(())
65 }
66}
67
68#[async_trait]
69impl QueueBackend for PostgresBackend {
70 async fn enqueue(&self, mut job: JobWrapper) -> Result<()> {
71 job.status = crate::job::JobStatus::Pending;
72
73 sqlx::query(&format!(
74 r#"INSERT INTO {} (id, payload, priority, attempts, max_attempts, scheduled_at, status)
75 VALUES ($1, $2, $3, $4, $5, $6, $7)"#,
76 self.table_name
77 ))
78 .bind(&job.id)
79 .bind(serde_json::to_value(&job)?)
80 .bind(job.priority)
81 .bind(job.attempts as i32)
82 .bind(job.max_retries as i32)
83 .bind(job.scheduled_at.map(|t| chrono::DateTime::<chrono::Utc>::from_timestamp(t, 0)))
84 .bind("pending")
85 .execute(&self.pool)
86 .await
87 .map_err(|e| QueueError::BackendError(e.to_string()))?;
88
89 Ok(())
90 }
91
92 async fn dequeue(&self) -> Result<Option<JobWrapper>> {
93 let row = sqlx::query(&format!(
95 r#"UPDATE {}
96 SET status = 'running', attempts = attempts + 1, updated_at = NOW()
97 WHERE id = (
98 SELECT id FROM {}
99 WHERE status = 'pending'
100 AND (scheduled_at IS NULL OR scheduled_at <= NOW())
101 ORDER BY priority DESC, created_at ASC
102 LIMIT 1
103 FOR UPDATE SKIP LOCKED
104 )
105 RETURNING payload"#,
106 self.table_name, self.table_name
107 ))
108 .fetch_optional(&self.pool)
109 .await
110 .map_err(|e| QueueError::BackendError(e.to_string()))?;
111
112 if let Some(row) = row {
113 let payload: serde_json::Value = row.try_get("payload")
114 .map_err(|e| QueueError::BackendError(e.to_string()))?;
115 let mut job: JobWrapper = serde_json::from_value(payload)
116 .map_err(|e| QueueError::SerializationError(e))?;
117 job.status = crate::job::JobStatus::Running;
118 job.attempts += 1;
119 Ok(Some(job))
120 } else {
121 Ok(None)
122 }
123 }
124
125 async fn complete(&self, job_id: &str) -> Result<()> {
126 sqlx::query(&format!(
127 r#"DELETE FROM {} WHERE id = $1"#,
128 self.table_name
129 ))
130 .bind(job_id)
131 .execute(&self.pool)
132 .await
133 .map_err(|e| QueueError::BackendError(e.to_string()))?;
134
135 Ok(())
136 }
137
138 async fn fail(&self, job_id: &str, error: String) -> Result<()> {
139 let row = sqlx::query(&format!(
141 r#"SELECT attempts, max_retries FROM {} WHERE id = $1"#,
142 self.table_name
143 ))
144 .bind(job_id)
145 .fetch_optional(&self.pool)
146 .await
147 .map_err(|e| QueueError::BackendError(e.to_string()))?;
148
149 if let Some(row) = row {
150 let attempts: i32 = row.try_get("attempts")
151 .map_err(|e| QueueError::BackendError(e.to_string()))?;
152 let max_retries: i32 = row.try_get("max_retries")
153 .map_err(|e| QueueError::BackendError(e.to_string()))?;
154
155 if attempts >= max_retries {
156 self.move_to_dead_letter_with_error(job_id, error).await?;
158 } else {
159 sqlx::query(&format!(
161 r#"UPDATE {} SET status = 'pending', updated_at = NOW() WHERE id = $1"#,
162 self.table_name
163 ))
164 .bind(job_id)
165 .execute(&self.pool)
166 .await
167 .map_err(|e| QueueError::BackendError(e.to_string()))?;
168 }
169 }
170
171 Ok(())
172 }
173
174 async fn retry(&self, job: JobWrapper) -> Result<()> {
175 sqlx::query(&format!(
177 r#"DELETE FROM {} WHERE id = $1"#,
178 self.table_name
179 ))
180 .bind(&job.id)
181 .execute(&self.pool)
182 .await
183 .map_err(|e| QueueError::BackendError(e.to_string()))?;
184
185 let mut job = job;
187 job.attempts = 0;
188 job.status = crate::job::JobStatus::Pending;
189
190 self.enqueue(job).await
191 }
192
193 async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
194 self.move_to_dead_letter_with_error(&job.id, job.error.unwrap_or_else(|| "Unknown error".to_string())).await
195 }
196
197 async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
198 let rows = sqlx::query(&format!(
199 r#"SELECT payload FROM {} ORDER BY created_at DESC"#,
200 self.dlq_table_name
201 ))
202 .fetch_all(&self.pool)
203 .await
204 .map_err(|e| QueueError::BackendError(e.to_string()))?;
205
206 let mut jobs = Vec::new();
207 for row in rows {
208 let payload: serde_json::Value = row.try_get("payload")
209 .map_err(|e| QueueError::BackendError(e.to_string()))?;
210 if let Ok(job) = serde_json::from_value::<JobWrapper>(payload) {
211 jobs.push(job);
212 }
213 }
214
215 Ok(jobs)
216 }
217
218 async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
219 let row = sqlx::query(&format!(
221 r#"SELECT payload FROM {} WHERE id = $1"#,
222 self.dlq_table_name
223 ))
224 .bind(job_id)
225 .fetch_optional(&self.pool)
226 .await
227 .map_err(|e| QueueError::BackendError(e.to_string()))?;
228
229 if let Some(row) = row {
230 let payload: serde_json::Value = row.try_get("payload")
231 .map_err(|e| QueueError::BackendError(e.to_string()))?;
232 let mut job: JobWrapper = serde_json::from_value(payload)
233 .map_err(|e| QueueError::SerializationError(e))?;
234
235 sqlx::query(&format!(
237 r#"DELETE FROM {} WHERE id = $1"#,
238 self.dlq_table_name
239 ))
240 .bind(job_id)
241 .execute(&self.pool)
242 .await
243 .map_err(|e| QueueError::BackendError(e.to_string()))?;
244
245 job.status = crate::job::JobStatus::Pending;
247 job.attempts = 0;
248 job.error = None;
249
250 self.enqueue(job).await?;
251 }
252
253 Ok(())
254 }
255}
256
257impl PostgresBackend {
258 async fn move_to_dead_letter_with_error(&self, job_id: &str, error: String) -> Result<()> {
259 let row = sqlx::query(&format!(
261 r#"SELECT payload, priority, attempts FROM {} WHERE id = $1"#,
262 self.table_name
263 ))
264 .bind(job_id)
265 .fetch_optional(&self.pool)
266 .await
267 .map_err(|e| QueueError::BackendError(e.to_string()))?;
268
269 if let Some(row) = row {
270 let payload: serde_json::Value = row.try_get("payload")
271 .map_err(|e| QueueError::BackendError(e.to_string()))?;
272 let priority: i32 = row.try_get("priority")
273 .map_err(|e| QueueError::BackendError(e.to_string()))?;
274 let attempts: i32 = row.try_get("attempts")
275 .map_err(|e| QueueError::BackendError(e.to_string()))?;
276
277 sqlx::query(&format!(
279 r#"INSERT INTO {} (id, payload, priority, attempts, error)
280 VALUES ($1, $2, $3, $4, $5)"#,
281 self.dlq_table_name
282 ))
283 .bind(job_id)
284 .bind(&payload)
285 .bind(priority)
286 .bind(attempts)
287 .bind(error)
288 .execute(&self.pool)
289 .await
290 .map_err(|e| QueueError::BackendError(e.to_string()))?;
291
292 sqlx::query(&format!(
294 r#"DELETE FROM {} WHERE id = $1"#,
295 self.table_name
296 ))
297 .bind(job_id)
298 .execute(&self.pool)
299 .await
300 .map_err(|e| QueueError::BackendError(e.to_string()))?;
301 }
302
303 Ok(())
304 }
305}