Skip to main content

oxidite_queue/
redis.rs

1use async_trait::async_trait;
2use redis::{Client, AsyncCommands};
3use crate::{QueueBackend, job::JobWrapper, Result, QueueError};
4
5/// Redis queue backend
6pub struct RedisBackend {
7    client: Client,
8    queue_key: String,
9    dlq_key: String,
10}
11
12impl RedisBackend {
13    pub fn new(url: &str, queue_key: &str) -> Result<Self> {
14        let client = Client::open(url)
15            .map_err(|e| QueueError::BackendError(e.to_string()))?;
16        
17        let dlq_key = format!("{}_dlq", queue_key);
18        
19        Ok(Self {
20            client,
21            queue_key: queue_key.to_string(),
22            dlq_key,
23        })
24    }
25}
26
27#[async_trait]
28impl QueueBackend for RedisBackend {
29    async fn enqueue(&self, job: JobWrapper) -> Result<()> {
30        let mut conn = self.client.get_multiplexed_async_connection()
31            .await
32            .map_err(|e| QueueError::BackendError(e.to_string()))?;
33            
34        let payload = serde_json::to_string(&job)?;
35        
36        // Use LPUSH to add to the head of the list (or tail, depending on how we want to process)
37        // Standard queue is usually LPUSH (enqueue) and RPOP (dequeue)
38        let _: () = conn.lpush(&self.queue_key, payload)
39            .await
40            .map_err(|e| QueueError::BackendError(e.to_string()))?;
41            
42        Ok(())
43    }
44
45    async fn dequeue(&self) -> Result<Option<JobWrapper>> {
46        let mut conn = self.client.get_multiplexed_async_connection()
47            .await
48            .map_err(|e| QueueError::BackendError(e.to_string()))?;
49            
50        // RPOP removes and returns the last element of the list
51        let result: Option<String> = conn.rpop(&self.queue_key, None)
52            .await
53            .map_err(|e| QueueError::BackendError(e.to_string()))?;
54            
55        if let Some(payload) = result {
56            let job: JobWrapper = serde_json::from_str(&payload)?;
57            Ok(Some(job))
58        } else {
59            Ok(None)
60        }
61    }
62
63    async fn complete(&self, _job_id: &str) -> Result<()> {
64        // In a simple RPOP implementation, the job is already removed from the queue.
65        // For more reliability, we'd use RPOPLPUSH to a processing queue and then remove from there.
66        // For this v1 implementation, we'll keep it simple.
67        Ok(())
68    }
69
70    async fn fail(&self, job_id: &str, error: String) -> Result<()> {
71        // Move failed jobs to a separate failed queue for inspection
72        let failed_key = format!("{}_failed", self.queue_key);
73        
74        let mut conn = self.client.get_multiplexed_async_connection()
75            .await
76            .map_err(|e| QueueError::BackendError(e.to_string()))?;
77        
78        // Store the failed job with error information
79        let failed_job = serde_json::json!({
80            "job_id": job_id,
81            "error": error,
82            "failed_at": chrono::Utc::now().to_rfc3339()
83        });
84        
85        let payload = serde_json::to_string(&failed_job)
86            .map_err(|e| QueueError::SerializationError(e))?;
87        
88        let _: () = conn.lpush(&failed_key, payload)
89            .await
90            .map_err(|e| QueueError::BackendError(e.to_string()))?;
91        
92        Ok(())
93    }
94
95    async fn retry(&self, job: JobWrapper) -> Result<()> {
96        self.enqueue(job).await
97    }
98
99    async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
100        let mut conn = self.client.get_multiplexed_async_connection()
101            .await
102            .map_err(|e| QueueError::BackendError(e.to_string()))?;
103            
104        let payload = serde_json::to_string(&job)?;
105        let _: () = conn.lpush(&self.dlq_key, payload)
106            .await
107            .map_err(|e| QueueError::BackendError(e.to_string()))?;
108            
109        Ok(())
110    }
111
112    async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
113        let mut conn = self.client.get_multiplexed_async_connection()
114            .await
115            .map_err(|e| QueueError::BackendError(e.to_string()))?;
116            
117        let results: Vec<String> = conn.lrange(&self.dlq_key, 0, -1)
118            .await
119            .map_err(|e| QueueError::BackendError(e.to_string()))?;
120            
121        let mut jobs = Vec::new();
122        for payload in results {
123            if let Ok(job) = serde_json::from_str::<JobWrapper>(&payload) {
124                jobs.push(job);
125            }
126        }
127        
128        Ok(jobs)
129    }
130
131    async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
132        let jobs = self.list_dead_letter().await?;
133        
134        // Find the job by ID
135        if let Some(job) = jobs.iter().find(|j| j.id == job_id) {
136            // Remove from DLQ
137            let mut conn = self.client.get_multiplexed_async_connection()
138                .await
139                .map_err(|e| QueueError::BackendError(e.to_string()))?;
140                
141            let payload = serde_json::to_string(&job)?;
142            let _: () = conn.lrem(&self.dlq_key, 1, payload)
143                .await
144                .map_err(|e| QueueError::BackendError(e.to_string()))?;
145            
146            // Clone and reset before re-enqueue
147            let mut job_clone = job.clone();
148            job_clone.status = crate::job::JobStatus::Pending;
149            job_clone.attempts = 0;
150            job_clone.error = None;
151            self.enqueue(job_clone).await?;
152        }
153        
154        Ok(())
155    }
156}