use async_trait::async_trait;
use redis::{Client, AsyncCommands};
use crate::{QueueBackend, job::JobWrapper, Result, QueueError};
pub struct RedisBackend {
client: Client,
queue_key: String,
dlq_key: String,
}
impl RedisBackend {
pub fn new(url: &str, queue_key: &str) -> Result<Self> {
let client = Client::open(url)
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let dlq_key = format!("{}_dlq", queue_key);
Ok(Self {
client,
queue_key: queue_key.to_string(),
dlq_key,
})
}
}
#[async_trait]
impl QueueBackend for RedisBackend {
async fn enqueue(&self, job: JobWrapper) -> Result<()> {
let mut conn = self.client.get_multiplexed_async_connection()
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let payload = serde_json::to_string(&job)?;
let _: () = conn.lpush(&self.queue_key, payload)
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
Ok(())
}
async fn dequeue(&self) -> Result<Option<JobWrapper>> {
let mut conn = self.client.get_multiplexed_async_connection()
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let result: Option<String> = conn.rpop(&self.queue_key, None)
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
if let Some(payload) = result {
let job: JobWrapper = serde_json::from_str(&payload)?;
Ok(Some(job))
} else {
Ok(None)
}
}
async fn complete(&self, _job_id: &str) -> Result<()> {
Ok(())
}
async fn fail(&self, job_id: &str, error: String) -> Result<()> {
let failed_key = format!("{}_failed", self.queue_key);
let mut conn = self.client.get_multiplexed_async_connection()
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let failed_job = serde_json::json!({
"job_id": job_id,
"error": error,
"failed_at": chrono::Utc::now().to_rfc3339()
});
let payload = serde_json::to_string(&failed_job)
.map_err(|e| QueueError::SerializationError(e))?;
let _: () = conn.lpush(&failed_key, payload)
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
Ok(())
}
async fn retry(&self, job: JobWrapper) -> Result<()> {
self.enqueue(job).await
}
async fn move_to_dead_letter(&self, job: JobWrapper) -> Result<()> {
let mut conn = self.client.get_multiplexed_async_connection()
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let payload = serde_json::to_string(&job)?;
let _: () = conn.lpush(&self.dlq_key, payload)
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
Ok(())
}
async fn list_dead_letter(&self) -> Result<Vec<JobWrapper>> {
let mut conn = self.client.get_multiplexed_async_connection()
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let results: Vec<String> = conn.lrange(&self.dlq_key, 0, -1)
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let mut jobs = Vec::new();
for payload in results {
if let Ok(job) = serde_json::from_str::<JobWrapper>(&payload) {
jobs.push(job);
}
}
Ok(jobs)
}
async fn retry_from_dead_letter(&self, job_id: &str) -> Result<()> {
let jobs = self.list_dead_letter().await?;
if let Some(job) = jobs.iter().find(|j| j.id == job_id) {
let mut conn = self.client.get_multiplexed_async_connection()
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let payload = serde_json::to_string(&job)?;
let _: () = conn.lrem(&self.dlq_key, 1, payload)
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let mut job_clone = job.clone();
job_clone.status = crate::job::JobStatus::Pending;
job_clone.attempts = 0;
job_clone.error = None;
self.enqueue(job_clone).await?;
}
Ok(())
}
async fn clear(&self) -> Result<()> {
let mut conn = self.client.get_multiplexed_async_connection()
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
let _: () = conn.del(&self.queue_key)
.await
.map_err(|e| QueueError::BackendError(e.to_string()))?;
Ok(())
}
}