oxidite_queue/
postgres.rs

1use async_trait::async_trait;
2use sqlx::{PgPool, Row};
3use crate::{QueueBackend, job::JobWrapper, Result, QueueError};
4
5/// PostgreSQL queue backend
6pub struct PostgresBackend {
7    pool: PgPool,
8    table_name: String,
9    dlq_table_name: String,
10}
11
12impl PostgresBackend {
13    pub async fn new(pool: PgPool, table_name: &str) -> Result<Self> {
14        let backend = Self {
15            pool,
16            table_name: table_name.to_string(),
17            dlq_table_name: format!("{}_dlq", table_name),
18        };
19        
20        // Initialize tables if they don't exist
21        backend.init_tables().await?;
22        
23        Ok(backend)
24    }
25
26    async fn init_tables(&self) -> Result<()> {
27        // Create main queue table
28        sqlx::query(&format!(
29            r#"CREATE TABLE IF NOT EXISTS {} (
30                id TEXT PRIMARY KEY,
31                payload JSONB NOT NULL,
32                priority INTEGER DEFAULT 0,
33                attempts INTEGER DEFAULT 0,
34                max_attempts INTEGER DEFAULT 3,
35                scheduled_at TIMESTAMP WITH TIME ZONE,
36                created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
37                updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
38                status VARCHAR(20) DEFAULT 'pending'
39            )"#,
40            self.table_name
41        ))
42        .execute(&self.pool)
43        .await
44        .map_err(|e| QueueError::BackendError(e.to_string()))?;
45
46        // Create dead letter queue table
47        sqlx::query(&format!(
48            r#"CREATE TABLE IF NOT EXISTS {} (
49                id TEXT PRIMARY KEY,
50                payload JSONB NOT NULL,
51                priority INTEGER DEFAULT 0,
52                attempts INTEGER DEFAULT 0,
53                error TEXT,
54                created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
55                updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
56                status VARCHAR(20) DEFAULT 'dead_letter'
57            )"#,
58            self.dlq_table_name
59        ))
60        .execute(&self.pool)
61        .await
62        .map_err(|e| QueueError::BackendError(e.to_string()))?;
63
64        Ok(())
65    }
66}
67
68#[async_trait]
69impl QueueBackend for PostgresBackend {
70    async fn enqueue(&self, mut job: JobWrapper) -> Result<()> {
71        job.status = crate::job::JobStatus::Pending;
72        
73        sqlx::query(&format!(
74            r#"INSERT INTO {} (id, payload, priority, attempts, max_attempts, scheduled_at, status)
75               VALUES ($1, $2, $3, $4, $5, $6, $7)"#,
76            self.table_name
77        ))
78        .bind(&job.id)
79        .bind(serde_json::to_value(&job)?)
80        .bind(job.priority)
81        .bind(job.attempts as i32)
82        .bind(job.max_retries as i32)
83        .bind(job.scheduled_at.map(|t| chrono::DateTime::<chrono::Utc>::from_timestamp(t, 0)))
84        .bind("pending")
85        .execute(&self.pool)
86        .await
87        .map_err(|e| QueueError::BackendError(e.to_string()))?;
88
89        Ok(())
90    }
91
92    async fn dequeue(&self) -> Result<Option<JobWrapper>> {
93        // Get the next job that is pending and ready to be processed
94        let row = sqlx::query(&format!(
95            r#"UPDATE {}
96               SET status = 'running', attempts = attempts + 1, updated_at = NOW()
97               WHERE id = (
98                   SELECT id FROM {}
99                   WHERE status = 'pending'
100                   AND (scheduled_at IS NULL OR scheduled_at <= NOW())
101                   ORDER BY priority DESC, created_at ASC
102                   LIMIT 1
103                   FOR UPDATE SKIP LOCKED
104               )
105               RETURNING payload"#,
106            self.table_name, self.table_name
107        ))
108        .fetch_optional(&self.pool)
109        .await
110        .map_err(|e| QueueError::BackendError(e.to_string()))?;
111
112        if let Some(row) = row {
113            let payload: serde_json::Value = row.try_get("payload")
114                .map_err(|e| QueueError::BackendError(e.to_string()))?;
115            let mut job: JobWrapper = serde_json::from_value(payload)
116                .map_err(|e| QueueError::SerializationError(e))?;
117            job.status = crate::job::JobStatus::Running;
118            job.attempts += 1;
119            Ok(Some(job))
120        } else {
121            Ok(None)
122        }
123    }
124
125    async fn complete(&self, job_id: &str) -> Result<()> {
126        sqlx::query(&format!(
127            r#"DELETE FROM {} WHERE id = $1"#,
128            self.table_name
129        ))
130        .bind(job_id)
131        .execute(&self.pool)
132        .await
133        .map_err(|e| QueueError::BackendError(e.to_string()))?;
134
135        Ok(())
136    }
137
138    async fn fail(&self, job_id: &str, error: String) -> Result<()> {
139        // Check if max attempts reached
140        let row = sqlx::query(&format!(
141            r#"SELECT attempts, max_retries FROM {} WHERE id = $1"#,
142            self.table_name
143        ))
144        .bind(job_id)
145        .fetch_optional(&self.pool)
146        .await
147        .map_err(|e| QueueError::BackendError(e.to_string()))?;
148
149        if let Some(row) = row {
150            let attempts: i32 = row.try_get("attempts")
151                .map_err(|e| QueueError::BackendError(e.to_string()))?;
152            let max_retries: i32 = row.try_get("max_retries")
153                .map_err(|e| QueueError::BackendError(e.to_string()))?;
154
155            if attempts >= max_retries {
156                // Move to dead letter queue
157                self.move_to_dead_letter_with_error(job_id, error).await?;
158            } else {
159                // Update status back to pending for retry
160                sqlx::query(&format!(
161                    r#"UPDATE {} SET status = 'pending', updated_at = NOW() WHERE id = $1"#,
162                    self.table_name
163                ))
164                .bind(job_id)
165                .execute(&self.pool)
166                .await
167                .map_err(|e| QueueError::BackendError(e.to_string()))?;
168            }
169        }
170
171        Ok(())
172    }
173
174    async fn retry(&self, job: JobWrapper) -> Result<()> {
175        // First, delete the existing job
176        sqlx::query(&format!(
177            r#"DELETE FROM {} WHERE id = $1"#,
178            self.table_name
179        ))
180        .bind(&job.id)
181        .execute(&self.pool)
182        .await
183        .map_err(|e| QueueError::BackendError(e.to_string()))?;
184
185        // Then re-enqueue with reset attempts
186        let mut job = job;
187        job.attempts = 0;
188        job.status = crate::job::JobStatus::Pending;
189        
190        self.enqueue(job).await
191    }
192
193    async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
194        self.move_to_dead_letter_with_error(&job.id, job.error.unwrap_or_else(|| "Unknown error".to_string())).await
195    }
196
197    async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
198        let rows = sqlx::query(&format!(
199            r#"SELECT payload FROM {} ORDER BY created_at DESC"#,
200            self.dlq_table_name
201        ))
202        .fetch_all(&self.pool)
203        .await
204        .map_err(|e| QueueError::BackendError(e.to_string()))?;
205
206        let mut jobs = Vec::new();
207        for row in rows {
208            let payload: serde_json::Value = row.try_get("payload")
209                .map_err(|e| QueueError::BackendError(e.to_string()))?;
210            if let Ok(job) = serde_json::from_value::<JobWrapper>(payload) {
211                jobs.push(job);
212            }
213        }
214
215        Ok(jobs)
216    }
217
218    async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
219        // Get job from DLQ
220        let row = sqlx::query(&format!(
221            r#"SELECT payload FROM {} WHERE id = $1"#,
222            self.dlq_table_name
223        ))
224        .bind(job_id)
225        .fetch_optional(&self.pool)
226        .await
227        .map_err(|e| QueueError::BackendError(e.to_string()))?;
228
229        if let Some(row) = row {
230            let payload: serde_json::Value = row.try_get("payload")
231                .map_err(|e| QueueError::BackendError(e.to_string()))?;
232            let mut job: JobWrapper = serde_json::from_value(payload)
233                .map_err(|e| QueueError::SerializationError(e))?;
234
235            // Delete from DLQ
236            sqlx::query(&format!(
237                r#"DELETE FROM {} WHERE id = $1"#,
238                self.dlq_table_name
239            ))
240            .bind(job_id)
241            .execute(&self.pool)
242            .await
243            .map_err(|e| QueueError::BackendError(e.to_string()))?;
244
245            // Reset and enqueue to main queue
246            job.status = crate::job::JobStatus::Pending;
247            job.attempts = 0;
248            job.error = None;
249
250            self.enqueue(job).await?;
251        }
252
253        Ok(())
254    }
255}
256
257impl PostgresBackend {
258    async fn move_to_dead_letter_with_error(&self, job_id: &str, error: String) -> Result<()> {
259        // Get the job from the main queue
260        let row = sqlx::query(&format!(
261            r#"SELECT payload, priority, attempts FROM {} WHERE id = $1"#,
262            self.table_name
263        ))
264        .bind(job_id)
265        .fetch_optional(&self.pool)
266        .await
267        .map_err(|e| QueueError::BackendError(e.to_string()))?;
268
269        if let Some(row) = row {
270            let payload: serde_json::Value = row.try_get("payload")
271                .map_err(|e| QueueError::BackendError(e.to_string()))?;
272            let priority: i32 = row.try_get("priority")
273                .map_err(|e| QueueError::BackendError(e.to_string()))?;
274            let attempts: i32 = row.try_get("attempts")
275                .map_err(|e| QueueError::BackendError(e.to_string()))?;
276
277            // Insert into dead letter queue
278            sqlx::query(&format!(
279                r#"INSERT INTO {} (id, payload, priority, attempts, error)
280                   VALUES ($1, $2, $3, $4, $5)"#,
281                self.dlq_table_name
282            ))
283            .bind(job_id)
284            .bind(&payload)
285            .bind(priority)
286            .bind(attempts)
287            .bind(error)
288            .execute(&self.pool)
289            .await
290            .map_err(|e| QueueError::BackendError(e.to_string()))?;
291
292            // Remove from main queue
293            sqlx::query(&format!(
294                r#"DELETE FROM {} WHERE id = $1"#,
295                self.table_name
296            ))
297            .bind(job_id)
298            .execute(&self.pool)
299            .await
300            .map_err(|e| QueueError::BackendError(e.to_string()))?;
301        }
302
303        Ok(())
304    }
305}