oxidite_queue/
redis.rs

1use async_trait::async_trait;
2use redis::{Client, AsyncCommands};
3use crate::{QueueBackend, job::JobWrapper, Result, QueueError};
4
5/// Redis queue backend
6pub struct RedisBackend {
7    client: Client,
8    queue_key: String,
9    dlq_key: String,
10}
11
12impl RedisBackend {
13    pub fn new(url: &str, queue_key: &str) -> Result<Self> {
14        let client = Client::open(url)
15            .map_err(|e| QueueError::BackendError(e.to_string()))?;
16        
17        let dlq_key = format!("{}_dlq", queue_key);
18        
19        Ok(Self {
20            client,
21            queue_key: queue_key.to_string(),
22            dlq_key,
23        })
24    }
25}
26
27#[async_trait]
28impl QueueBackend for RedisBackend {
29    async fn enqueue(&self, job: JobWrapper) -> Result<()> {
30        let mut conn = self.client.get_multiplexed_async_connection()
31            .await
32            .map_err(|e| QueueError::BackendError(e.to_string()))?;
33            
34        let payload = serde_json::to_string(&job)?;
35        
36        // Use LPUSH to add to the head of the list (or tail, depending on how we want to process)
37        // Standard queue is usually LPUSH (enqueue) and RPOP (dequeue)
38        let _: () = conn.lpush(&self.queue_key, payload)
39            .await
40            .map_err(|e| QueueError::BackendError(e.to_string()))?;
41            
42        Ok(())
43    }
44
45    async fn dequeue(&self) -> Result<Option<JobWrapper>> {
46        let mut conn = self.client.get_multiplexed_async_connection()
47            .await
48            .map_err(|e| QueueError::BackendError(e.to_string()))?;
49            
50        // RPOP removes and returns the last element of the list
51        let result: Option<String> = conn.rpop(&self.queue_key, None)
52            .await
53            .map_err(|e| QueueError::BackendError(e.to_string()))?;
54            
55        if let Some(payload) = result {
56            let job: JobWrapper = serde_json::from_str(&payload)?;
57            Ok(Some(job))
58        } else {
59            Ok(None)
60        }
61    }
62
63    async fn complete(&self, _job_id: &str) -> Result<()> {
64        // In a simple RPOP implementation, the job is already removed from the queue.
65        // For more reliability, we'd use RPOPLPUSH to a processing queue and then remove from there.
66        // For this v1 implementation, we'll keep it simple.
67        Ok(())
68    }
69
70    async fn fail(&self, _job_id: &str, _error: String) -> Result<()> {
71        // Similarly, we might want to move to a failed queue.
72        // TODO: Implement failed queue logic
73        Ok(())
74    }
75
76    async fn retry(&self, job: JobWrapper) -> Result<()> {
77        self.enqueue(job).await
78    }
79
80    async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
81        let mut conn = self.client.get_multiplexed_async_connection()
82            .await
83            .map_err(|e| QueueError::BackendError(e.to_string()))?;
84            
85        let payload = serde_json::to_string(&job)?;
86        let _: () = conn.lpush(&self.dlq_key, payload)
87            .await
88            .map_err(|e| QueueError::BackendError(e.to_string()))?;
89            
90        Ok(())
91    }
92
93    async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
94        let mut conn = self.client.get_multiplexed_async_connection()
95            .await
96            .map_err(|e| QueueError::BackendError(e.to_string()))?;
97            
98        let results: Vec<String> = conn.lrange(&self.dlq_key, 0, -1)
99            .await
100            .map_err(|e| QueueError::BackendError(e.to_string()))?;
101            
102        let mut jobs = Vec::new();
103        for payload in results {
104            if let Ok(job) = serde_json::from_str::<JobWrapper>(&payload) {
105                jobs.push(job);
106            }
107        }
108        
109        Ok(jobs)
110    }
111
112    async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
113        let jobs = self.list_dead_letter().await?;
114        
115        // Find the job by ID
116        if let Some(job) = jobs.iter().find(|j| j.id == job_id) {
117            // Remove from DLQ
118            let mut conn = self.client.get_multiplexed_async_connection()
119                .await
120                .map_err(|e| QueueError::BackendError(e.to_string()))?;
121                
122            let payload = serde_json::to_string(&job)?;
123            let _: () = conn.lrem(&self.dlq_key, 1, payload)
124                .await
125                .map_err(|e| QueueError::BackendError(e.to_string()))?;
126            
127            // Clone and reset before re-enqueue
128            let mut job_clone = job.clone();
129            job_clone.status = crate::job::JobStatus::Pending;
130            job_clone.attempts = 0;
131            job_clone.error = None;
132            self.enqueue(job_clone).await?;
133        }
134        
135        Ok(())
136    }
137}