1use async_trait::async_trait;
2use redis::{Client, AsyncCommands};
3use crate::{QueueBackend, job::JobWrapper, Result, QueueError};
4
5pub struct RedisBackend {
7 client: Client,
8 queue_key: String,
9 dlq_key: String,
10}
11
12impl RedisBackend {
13 pub fn new(url: &str, queue_key: &str) -> Result<Self> {
14 let client = Client::open(url)
15 .map_err(|e| QueueError::BackendError(e.to_string()))?;
16
17 let dlq_key = format!("{}_dlq", queue_key);
18
19 Ok(Self {
20 client,
21 queue_key: queue_key.to_string(),
22 dlq_key,
23 })
24 }
25}
26
27#[async_trait]
28impl QueueBackend for RedisBackend {
29 async fn enqueue(&self, job: JobWrapper) -> Result<()> {
30 let mut conn = self.client.get_multiplexed_async_connection()
31 .await
32 .map_err(|e| QueueError::BackendError(e.to_string()))?;
33
34 let payload = serde_json::to_string(&job)?;
35
36 let _: () = conn.lpush(&self.queue_key, payload)
39 .await
40 .map_err(|e| QueueError::BackendError(e.to_string()))?;
41
42 Ok(())
43 }
44
45 async fn dequeue(&self) -> Result<Option<JobWrapper>> {
46 let mut conn = self.client.get_multiplexed_async_connection()
47 .await
48 .map_err(|e| QueueError::BackendError(e.to_string()))?;
49
50 let result: Option<String> = conn.rpop(&self.queue_key, None)
52 .await
53 .map_err(|e| QueueError::BackendError(e.to_string()))?;
54
55 if let Some(payload) = result {
56 let job: JobWrapper = serde_json::from_str(&payload)?;
57 Ok(Some(job))
58 } else {
59 Ok(None)
60 }
61 }
62
63 async fn complete(&self, _job_id: &str) -> Result<()> {
64 Ok(())
68 }
69
70 async fn fail(&self, _job_id: &str, _error: String) -> Result<()> {
71 Ok(())
74 }
75
76 async fn retry(&self, job: JobWrapper) -> Result<()> {
77 self.enqueue(job).await
78 }
79
80 async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
81 let mut conn = self.client.get_multiplexed_async_connection()
82 .await
83 .map_err(|e| QueueError::BackendError(e.to_string()))?;
84
85 let payload = serde_json::to_string(&job)?;
86 let _: () = conn.lpush(&self.dlq_key, payload)
87 .await
88 .map_err(|e| QueueError::BackendError(e.to_string()))?;
89
90 Ok(())
91 }
92
93 async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
94 let mut conn = self.client.get_multiplexed_async_connection()
95 .await
96 .map_err(|e| QueueError::BackendError(e.to_string()))?;
97
98 let results: Vec<String> = conn.lrange(&self.dlq_key, 0, -1)
99 .await
100 .map_err(|e| QueueError::BackendError(e.to_string()))?;
101
102 let mut jobs = Vec::new();
103 for payload in results {
104 if let Ok(job) = serde_json::from_str::<JobWrapper>(&payload) {
105 jobs.push(job);
106 }
107 }
108
109 Ok(jobs)
110 }
111
112 async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
113 let jobs = self.list_dead_letter().await?;
114
115 if let Some(job) = jobs.iter().find(|j| j.id == job_id) {
117 let mut conn = self.client.get_multiplexed_async_connection()
119 .await
120 .map_err(|e| QueueError::BackendError(e.to_string()))?;
121
122 let payload = serde_json::to_string(&job)?;
123 let _: () = conn.lrem(&self.dlq_key, 1, payload)
124 .await
125 .map_err(|e| QueueError::BackendError(e.to_string()))?;
126
127 let mut job_clone = job.clone();
129 job_clone.status = crate::job::JobStatus::Pending;
130 job_clone.attempts = 0;
131 job_clone.error = None;
132 self.enqueue(job_clone).await?;
133 }
134
135 Ok(())
136 }
137}