1use async_trait::async_trait;
2use redis::{Client, AsyncCommands};
3use crate::{QueueBackend, job::JobWrapper, Result, QueueError};
4
5pub struct RedisBackend {
7 client: Client,
8 queue_key: String,
9 dlq_key: String,
10}
11
12impl RedisBackend {
13 pub fn new(url: &str, queue_key: &str) -> Result<Self> {
14 let client = Client::open(url)
15 .map_err(|e| QueueError::BackendError(e.to_string()))?;
16
17 let dlq_key = format!("{}_dlq", queue_key);
18
19 Ok(Self {
20 client,
21 queue_key: queue_key.to_string(),
22 dlq_key,
23 })
24 }
25}
26
27#[async_trait]
28impl QueueBackend for RedisBackend {
29 async fn enqueue(&self, job: JobWrapper) -> Result<()> {
30 let mut conn = self.client.get_multiplexed_async_connection()
31 .await
32 .map_err(|e| QueueError::BackendError(e.to_string()))?;
33
34 let payload = serde_json::to_string(&job)?;
35
36 let _: () = conn.lpush(&self.queue_key, payload)
39 .await
40 .map_err(|e| QueueError::BackendError(e.to_string()))?;
41
42 Ok(())
43 }
44
45 async fn dequeue(&self) -> Result<Option<JobWrapper>> {
46 let mut conn = self.client.get_multiplexed_async_connection()
47 .await
48 .map_err(|e| QueueError::BackendError(e.to_string()))?;
49
50 let result: Option<String> = conn.rpop(&self.queue_key, None)
52 .await
53 .map_err(|e| QueueError::BackendError(e.to_string()))?;
54
55 if let Some(payload) = result {
56 let job: JobWrapper = serde_json::from_str(&payload)?;
57 Ok(Some(job))
58 } else {
59 Ok(None)
60 }
61 }
62
63 async fn complete(&self, _job_id: &str) -> Result<()> {
64 Ok(())
68 }
69
70 async fn fail(&self, job_id: &str, error: String) -> Result<()> {
71 let failed_key = format!("{}_failed", self.queue_key);
73
74 let mut conn = self.client.get_multiplexed_async_connection()
75 .await
76 .map_err(|e| QueueError::BackendError(e.to_string()))?;
77
78 let failed_job = serde_json::json!({
80 "job_id": job_id,
81 "error": error,
82 "failed_at": chrono::Utc::now().to_rfc3339()
83 });
84
85 let payload = serde_json::to_string(&failed_job)
86 .map_err(|e| QueueError::SerializationError(e))?;
87
88 let _: () = conn.lpush(&failed_key, payload)
89 .await
90 .map_err(|e| QueueError::BackendError(e.to_string()))?;
91
92 Ok(())
93 }
94
95 async fn retry(&self, job: JobWrapper) -> Result<()> {
96 self.enqueue(job).await
97 }
98
99 async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
100 let mut conn = self.client.get_multiplexed_async_connection()
101 .await
102 .map_err(|e| QueueError::BackendError(e.to_string()))?;
103
104 let payload = serde_json::to_string(&job)?;
105 let _: () = conn.lpush(&self.dlq_key, payload)
106 .await
107 .map_err(|e| QueueError::BackendError(e.to_string()))?;
108
109 Ok(())
110 }
111
112 async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
113 let mut conn = self.client.get_multiplexed_async_connection()
114 .await
115 .map_err(|e| QueueError::BackendError(e.to_string()))?;
116
117 let results: Vec<String> = conn.lrange(&self.dlq_key, 0, -1)
118 .await
119 .map_err(|e| QueueError::BackendError(e.to_string()))?;
120
121 let mut jobs = Vec::new();
122 for payload in results {
123 if let Ok(job) = serde_json::from_str::<JobWrapper>(&payload) {
124 jobs.push(job);
125 }
126 }
127
128 Ok(jobs)
129 }
130
131 async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
132 let jobs = self.list_dead_letter().await?;
133
134 if let Some(job) = jobs.iter().find(|j| j.id == job_id) {
136 let mut conn = self.client.get_multiplexed_async_connection()
138 .await
139 .map_err(|e| QueueError::BackendError(e.to_string()))?;
140
141 let payload = serde_json::to_string(&job)?;
142 let _: () = conn.lrem(&self.dlq_key, 1, payload)
143 .await
144 .map_err(|e| QueueError::BackendError(e.to_string()))?;
145
146 let mut job_clone = job.clone();
148 job_clone.status = crate::job::JobStatus::Pending;
149 job_clone.attempts = 0;
150 job_clone.error = None;
151 self.enqueue(job_clone).await?;
152 }
153
154 Ok(())
155 }
156}