use crate::error::{Result, TidewayError};
use crate::traits::job::{Job, JobData, JobQueue};
use async_trait::async_trait;
#[cfg(feature = "jobs")]
use chrono::{DateTime, Duration, Utc};
use uuid::Uuid;
#[derive(Clone)]
pub struct RedisJobQueue {
client: redis::Client,
worker_id: String,
max_retries: u32,
retry_backoff_seconds: u64,
health: std::sync::Arc<std::sync::Mutex<bool>>,
}
impl RedisJobQueue {
pub fn new(url: &str, worker_id: Option<String>, max_retries: u32, retry_backoff_seconds: u64) -> Result<Self> {
let client = redis::Client::open(url)
.map_err(|e| TidewayError::internal(format!("Failed to create Redis client: {}", e)))?;
let worker_id = worker_id.unwrap_or_else(|| Uuid::new_v4().to_string());
let queue = Self {
client,
worker_id,
max_retries,
retry_backoff_seconds,
health: std::sync::Arc::new(std::sync::Mutex::new(true)),
};
queue.start_scheduler_task();
Ok(queue)
}
async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
self.client
.get_multiplexed_async_connection()
.await
.map_err(|e| TidewayError::internal(format!("Failed to get Redis connection: {}", e)))
}
fn start_scheduler_task(&self) {
let client = self.client.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
if let Ok(mut conn) = client.get_multiplexed_async_connection().await {
let now = Utc::now().timestamp();
let scheduled_key = "jobs:scheduled";
let results: redis::RedisResult<Vec<(String, f64)>> = redis::cmd("ZRANGEBYSCORE")
.arg(scheduled_key)
.arg("-inf")
.arg(now)
.arg("WITHSCORES")
.query_async(&mut conn)
.await;
if let Ok(jobs) = results {
for (job_json, _score) in jobs {
let _: redis::RedisResult<()> = redis::cmd("ZREM")
.arg(scheduled_key)
.arg(&job_json)
.query_async(&mut conn)
.await;
let _: redis::RedisResult<()> = redis::cmd("LPUSH")
.arg("jobs:pending")
.arg(&job_json)
.query_async(&mut conn)
.await;
}
}
}
}
});
}
}
#[async_trait]
impl JobQueue for RedisJobQueue {
async fn enqueue(&self, job: &dyn Job) -> Result<String> {
let job_id = Uuid::new_v4().to_string();
let payload = job.serialize()?;
let job_data = JobData::new(
job_id.clone(),
job.job_type().to_string(),
payload,
self.max_retries,
);
let job_json = serde_json::to_string(&job_data)
.map_err(|e| TidewayError::internal(format!("Failed to serialize job: {}", e)))?;
let mut conn = self.get_connection().await?;
redis::cmd("LPUSH")
.arg("jobs:pending")
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to enqueue job: {}", e)))?;
Ok(job_id)
}
async fn dequeue(&self) -> Result<Option<JobData>> {
let mut conn = self.get_connection().await?;
let processing_key = format!("jobs:processing:{}", self.worker_id);
let result: Option<String> = redis::cmd("BRPOPLPUSH")
.arg("jobs:pending")
.arg(&processing_key)
.arg(5) .query_async(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to dequeue job: {}", e)))?;
if let Some(job_json) = result {
let job_data: JobData = serde_json::from_str(&job_json)
.map_err(|e| TidewayError::internal(format!("Failed to deserialize job: {}", e)))?;
Ok(Some(job_data))
} else {
Ok(None)
}
}
async fn complete(&self, job_id: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let processing_key = format!("jobs:processing:{}", self.worker_id);
let jobs: Vec<String> = redis::cmd("LRANGE")
.arg(&processing_key)
.arg(0)
.arg(-1)
.query_async(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to list processing jobs: {}", e)))?;
for job_json in jobs {
if let Ok(job_data) = serde_json::from_str::<JobData>(&job_json) {
if job_data.job_id == job_id {
redis::cmd("LREM")
.arg(&processing_key)
.arg(1)
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to remove job from processing: {}", e)))?;
return Ok(());
}
}
}
Ok(())
}
async fn fail(&self, job_id: &str, _error: String) -> Result<()> {
let mut conn = self.get_connection().await?;
let processing_key = format!("jobs:processing:{}", self.worker_id);
let jobs: Vec<String> = redis::cmd("LRANGE")
.arg(&processing_key)
.arg(0)
.arg(-1)
.query_async(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to list processing jobs: {}", e)))?;
for job_json in jobs {
if let Ok(mut job_data) = serde_json::from_str::<JobData>(&job_json) {
if job_data.job_id == job_id {
redis::cmd("LREM")
.arg(&processing_key)
.arg(1)
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to remove job from processing: {}", e)))?;
if job_data.should_retry() {
let backoff_seconds = self.retry_backoff_seconds * (2_u64.pow(job_data.retry_count));
let retry_at = Utc::now() + Duration::seconds(backoff_seconds as i64);
job_data.increment_retry();
let retry_json = serde_json::to_string(&job_data)
.map_err(|e| TidewayError::internal(format!("Failed to serialize job for retry: {}", e)))?;
redis::cmd("ZADD")
.arg("jobs:scheduled")
.arg(retry_at.timestamp())
.arg(&retry_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to schedule retry: {}", e)))?;
} else {
redis::cmd("LPUSH")
.arg("jobs:failed")
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to add to failed list: {}", e)))?;
}
return Ok(());
}
}
}
Ok(())
}
async fn retry(&self, job_id: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let processing_key = format!("jobs:processing:{}", self.worker_id);
let jobs: Vec<String> = redis::cmd("LRANGE")
.arg(&processing_key)
.arg(0)
.arg(-1)
.query_async(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to list processing jobs: {}", e)))?;
for job_json in jobs {
if let Ok(mut job_data) = serde_json::from_str::<JobData>(&job_json) {
if job_data.job_id == job_id {
redis::cmd("LREM")
.arg(&processing_key)
.arg(1)
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to remove job from processing: {}", e)))?;
if job_data.should_retry() {
job_data.increment_retry();
let retry_json = serde_json::to_string(&job_data)
.map_err(|e| TidewayError::internal(format!("Failed to serialize job for retry: {}", e)))?;
redis::cmd("LPUSH")
.arg("jobs:pending")
.arg(&retry_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to retry job: {}", e)))?;
} else {
redis::cmd("LPUSH")
.arg("jobs:failed")
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to add to failed list: {}", e)))?;
}
return Ok(());
}
}
}
Ok(())
}
#[cfg(feature = "jobs")]
async fn schedule(&self, job: &dyn Job, run_at: DateTime<Utc>) -> Result<String> {
let job_id = Uuid::new_v4().to_string();
let payload = job.serialize()?;
let job_data = JobData::scheduled(
job_id.clone(),
job.job_type().to_string(),
payload,
self.max_retries,
run_at,
);
let job_json = serde_json::to_string(&job_data)
.map_err(|e| TidewayError::internal(format!("Failed to serialize job: {}", e)))?;
let mut conn = self.get_connection().await?;
redis::cmd("ZADD")
.arg("jobs:scheduled")
.arg(run_at.timestamp())
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to schedule job: {}", e)))?;
Ok(job_id)
}
fn is_healthy(&self) -> bool {
*self.health.lock().unwrap_or_else(|e| e.into_inner())
}
}