Skip to main content

oxidite_queue/
postgres.rs

1use async_trait::async_trait;
2use sqlx::{PgPool, Row};
3use crate::{QueueBackend, job::JobWrapper, Result, QueueError};
4
5/// PostgreSQL queue backend
6pub struct PostgresBackend {
7    pool: PgPool,
8    table_name: String,
9    dlq_table_name: String,
10}
11
12impl PostgresBackend {
13    pub async fn new(pool: PgPool, table_name: &str) -> Result<Self> {
14        let backend = Self {
15            pool,
16            table_name: table_name.to_string(),
17            dlq_table_name: format!("{}_dlq", table_name),
18        };
19        
20        // Initialize tables if they don't exist
21        backend.init_tables().await?;
22        
23        Ok(backend)
24    }
25
26    async fn init_tables(&self) -> Result<()> {
27        // Create main queue table
28        sqlx::query(&format!(
29            r#"CREATE TABLE IF NOT EXISTS {} (
30                id TEXT PRIMARY KEY,
31                payload JSONB NOT NULL,
32                priority INTEGER DEFAULT 0,
33                attempts INTEGER DEFAULT 0,
34                max_attempts INTEGER DEFAULT 3,
35                scheduled_at TIMESTAMP WITH TIME ZONE,
36                created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
37                updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
38                status VARCHAR(20) DEFAULT 'pending'
39            )"#,
40            self.table_name
41        ))
42        .execute(&self.pool)
43        .await
44        .map_err(|e| QueueError::BackendError(e.to_string()))?;
45
46        // Create dead letter queue table
47        sqlx::query(&format!(
48            r#"CREATE TABLE IF NOT EXISTS {} (
49                id TEXT PRIMARY KEY,
50                payload JSONB NOT NULL,
51                priority INTEGER DEFAULT 0,
52                attempts INTEGER DEFAULT 0,
53                error TEXT,
54                created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
55                updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
56                status VARCHAR(20) DEFAULT 'dead_letter'
57            )"#,
58            self.dlq_table_name
59        ))
60        .execute(&self.pool)
61        .await
62        .map_err(|e| QueueError::BackendError(e.to_string()))?;
63
64        Ok(())
65    }
66}
67
68#[async_trait]
69impl QueueBackend for PostgresBackend {
70    async fn enqueue(&self, mut job: JobWrapper) -> Result<()> {
71        job.status = crate::job::JobStatus::Pending;
72        
73        sqlx::query(&format!(
74            r#"INSERT INTO {} (id, payload, priority, attempts, max_attempts, scheduled_at, status)
75               VALUES ($1, $2, $3, $4, $5, $6, $7)"#,
76            self.table_name
77        ))
78        .bind(&job.id)
79        .bind(serde_json::to_value(&job)?)
80        .bind(job.priority)
81        .bind(job.attempts as i32)
82        .bind(job.max_retries as i32)
83        .bind(job.scheduled_at.map(|t| chrono::DateTime::<chrono::Utc>::from_timestamp(t, 0)))
84        .bind("pending")
85        .execute(&self.pool)
86        .await
87        .map_err(|e| QueueError::BackendError(e.to_string()))?;
88
89        Ok(())
90    }
91
92    async fn dequeue(&self) -> Result<Option<JobWrapper>> {
93        // Get the next job that is pending and ready to be processed
94        let row = sqlx::query(&format!(
95            r#"UPDATE {}
96               SET status = 'running', attempts = attempts + 1, updated_at = NOW()
97               WHERE id = (
98                   SELECT id FROM {}
99                   WHERE status = 'pending'
100                   AND (scheduled_at IS NULL OR scheduled_at <= NOW())
101                   ORDER BY priority DESC, created_at ASC
102                   LIMIT 1
103                   FOR UPDATE SKIP LOCKED
104               )
105               RETURNING payload"#,
106            self.table_name, self.table_name
107        ))
108        .fetch_optional(&self.pool)
109        .await
110        .map_err(|e| QueueError::BackendError(e.to_string()))?;
111
112        if let Some(row) = row {
113            let payload: serde_json::Value = row.try_get("payload")
114                .map_err(|e| QueueError::BackendError(e.to_string()))?;
115            let mut job: JobWrapper = serde_json::from_value(payload)
116                .map_err(|e| QueueError::SerializationError(e))?;
117            job.status = crate::job::JobStatus::Running;
118            job.attempts += 1;
119            Ok(Some(job))
120        } else {
121            Ok(None)
122        }
123    }
124
125    async fn complete(&self, job_id: &str) -> Result<()> {
126        sqlx::query(&format!(
127            r#"DELETE FROM {} WHERE id = $1"#,
128            self.table_name
129        ))
130        .bind(job_id)
131        .execute(&self.pool)
132        .await
133        .map_err(|e| QueueError::BackendError(e.to_string()))?;
134
135        Ok(())
136    }
137
138    async fn fail(&self, job_id: &str, error: String) -> Result<()> {
139        // Check if max attempts reached
140        let row = sqlx::query(&format!(
141            r#"SELECT attempts, max_retries FROM {} WHERE id = $1"#,
142            self.table_name
143        ))
144        .bind(job_id)
145        .fetch_optional(&self.pool)
146        .await
147        .map_err(|e| QueueError::BackendError(e.to_string()))?;
148
149        if let Some(row) = row {
150            let attempts: i32 = row.try_get("attempts")
151                .map_err(|e| QueueError::BackendError(e.to_string()))?;
152            let max_retries: i32 = row.try_get("max_retries")
153                .map_err(|e| QueueError::BackendError(e.to_string()))?;
154
155            if attempts >= max_retries {
156                // Move to dead letter queue
157                self.move_to_dead_letter_with_error(job_id, error).await?;
158            } else {
159                // Update status back to pending for retry
160                sqlx::query(&format!(
161                    r#"UPDATE {} SET status = 'pending', updated_at = NOW() WHERE id = $1"#,
162                    self.table_name
163                ))
164                .bind(job_id)
165                .execute(&self.pool)
166                .await
167                .map_err(|e| QueueError::BackendError(e.to_string()))?;
168            }
169        }
170
171        Ok(())
172    }
173
174    async fn retry(&self, job: JobWrapper) -> Result<()> {
175        // First, delete the existing job
176        sqlx::query(&format!(
177            r#"DELETE FROM {} WHERE id = $1"#,
178            self.table_name
179        ))
180        .bind(&job.id)
181        .execute(&self.pool)
182        .await
183        .map_err(|e| QueueError::BackendError(e.to_string()))?;
184
185        // Then re-enqueue with reset attempts
186        let mut job = job;
187        job.attempts = 0;
188        job.status = crate::job::JobStatus::Pending;
189        
190        self.enqueue(job).await
191    }
192
193    async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
194        self.move_to_dead_letter_with_error(&job.id, job.error.unwrap_or_else(|| "Unknown error".to_string())).await
195    }
196
197    async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
198        let rows = sqlx::query(&format!(
199            r#"SELECT payload FROM {} ORDER BY created_at DESC"#,
200            self.dlq_table_name
201        ))
202        .fetch_all(&self.pool)
203        .await
204        .map_err(|e| QueueError::BackendError(e.to_string()))?;
205
206        let mut jobs = Vec::new();
207        for row in rows {
208            let payload: serde_json::Value = row.try_get("payload")
209                .map_err(|e| QueueError::BackendError(e.to_string()))?;
210            if let Ok(job) = serde_json::from_value::<JobWrapper>(payload) {
211                jobs.push(job);
212            }
213        }
214
215        Ok(jobs)
216    }
217
218    async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
219        // Get job from DLQ
220        let row = sqlx::query(&format!(
221            r#"SELECT payload FROM {} WHERE id = $1"#,
222            self.dlq_table_name
223        ))
224        .bind(job_id)
225        .fetch_optional(&self.pool)
226        .await
227        .map_err(|e| QueueError::BackendError(e.to_string()))?;
228
229        if let Some(row) = row {
230            let payload: serde_json::Value = row.try_get("payload")
231                .map_err(|e| QueueError::BackendError(e.to_string()))?;
232            let mut job: JobWrapper = serde_json::from_value(payload)
233                .map_err(|e| QueueError::SerializationError(e))?;
234
235            // Delete from DLQ
236            sqlx::query(&format!(
237                r#"DELETE FROM {} WHERE id = $1"#,
238                self.dlq_table_name
239            ))
240            .bind(job_id)
241            .execute(&self.pool)
242            .await
243            .map_err(|e| QueueError::BackendError(e.to_string()))?;
244
245            // Reset and enqueue to main queue
246            job.status = crate::job::JobStatus::Pending;
247            job.attempts = 0;
248            job.error = None;
249
250            self.enqueue(job).await?;
251        }
252
253        Ok(())
254    }
255
256    async fn clear(&self) -> Result<()> {
257        sqlx::query(&format!("DELETE FROM {}", self.table_name))
258            .execute(&self.pool)
259            .await
260            .map_err(|e| QueueError::BackendError(e.to_string()))?;
261        Ok(())
262    }
263}
264
265impl PostgresBackend {
266    async fn move_to_dead_letter_with_error(&self, job_id: &str, error: String) -> Result<()> {
267        // Get the job from the main queue
268        let row = sqlx::query(&format!(
269            r#"SELECT payload, priority, attempts FROM {} WHERE id = $1"#,
270            self.table_name
271        ))
272        .bind(job_id)
273        .fetch_optional(&self.pool)
274        .await
275        .map_err(|e| QueueError::BackendError(e.to_string()))?;
276
277        if let Some(row) = row {
278            let payload: serde_json::Value = row.try_get("payload")
279                .map_err(|e| QueueError::BackendError(e.to_string()))?;
280            let priority: i32 = row.try_get("priority")
281                .map_err(|e| QueueError::BackendError(e.to_string()))?;
282            let attempts: i32 = row.try_get("attempts")
283                .map_err(|e| QueueError::BackendError(e.to_string()))?;
284
285            // Insert into dead letter queue
286            sqlx::query(&format!(
287                r#"INSERT INTO {} (id, payload, priority, attempts, error)
288                   VALUES ($1, $2, $3, $4, $5)"#,
289                self.dlq_table_name
290            ))
291            .bind(job_id)
292            .bind(&payload)
293            .bind(priority)
294            .bind(attempts)
295            .bind(error)
296            .execute(&self.pool)
297            .await
298            .map_err(|e| QueueError::BackendError(e.to_string()))?;
299
300            // Remove from main queue
301            sqlx::query(&format!(
302                r#"DELETE FROM {} WHERE id = $1"#,
303                self.table_name
304            ))
305            .bind(job_id)
306            .execute(&self.pool)
307            .await
308            .map_err(|e| QueueError::BackendError(e.to_string()))?;
309        }
310
311        Ok(())
312    }
313}