use crate::error::{Result, TidewayError};
use crate::traits::job::{Job, JobData, JobQueue};
use async_trait::async_trait;
#[cfg(feature = "jobs")]
use chrono::{DateTime, Duration, Utc};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use uuid::Uuid;
const PENDING_JOBS_KEY: &str = "jobs:pending";
const SCHEDULED_JOBS_KEY: &str = "jobs:scheduled";
const WORKERS_KEY: &str = "jobs:workers";
const PROCESSING_KEY_PREFIX: &str = "jobs:processing:";
const WORKER_HEARTBEAT_KEY_PREFIX: &str = "jobs:worker:heartbeat:";
const WORKER_HEARTBEAT_TTL_SECONDS: u64 = 30;
const MOVE_DUE_JOBS_LUA: &str = r#"
local scheduled_key = KEYS[1]
local pending_key = KEYS[2]
local cutoff = tonumber(ARGV[1])
local jobs = redis.call('ZRANGEBYSCORE', scheduled_key, '-inf', cutoff)
for _, job in ipairs(jobs) do
redis.call('ZREM', scheduled_key, job)
redis.call('LPUSH', pending_key, job)
end
return #jobs
"#;
const RECLAIM_STALE_WORKERS_LUA: &str = r#"
local workers_key = KEYS[1]
local pending_key = KEYS[2]
local heartbeat_prefix = ARGV[1]
local processing_prefix = ARGV[2]
local reclaimed = 0
local workers = redis.call('SMEMBERS', workers_key)
for _, worker in ipairs(workers) do
local heartbeat_key = heartbeat_prefix .. worker
if redis.call('EXISTS', heartbeat_key) == 0 then
local processing_key = processing_prefix .. worker
while true do
local job = redis.call('RPOPLPUSH', processing_key, pending_key)
if not job then
break
end
reclaimed = reclaimed + 1
end
redis.call('SREM', workers_key, worker)
end
end
return reclaimed
"#;
fn processing_key(worker_id: &str) -> String {
format!("{PROCESSING_KEY_PREFIX}{worker_id}")
}
fn worker_heartbeat_key(worker_id: &str) -> String {
format!("{WORKER_HEARTBEAT_KEY_PREFIX}{worker_id}")
}
#[derive(Clone)]
pub struct RedisJobQueue {
client: redis::Client,
worker_id: String,
max_retries: u32,
retry_backoff_seconds: u64,
health_status: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
scheduler_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
impl RedisJobQueue {
pub fn new(
url: &str,
worker_id: Option<String>,
max_retries: u32,
retry_backoff_seconds: u64,
) -> Result<Self> {
let client = redis::Client::open(url)
.map_err(|e| TidewayError::internal(format!("Failed to create Redis client: {}", e)))?;
let worker_id = worker_id.unwrap_or_else(|| Uuid::new_v4().to_string());
let shutdown = Arc::new(AtomicBool::new(false));
let queue = Self {
client,
worker_id,
max_retries,
retry_backoff_seconds,
health_status: Arc::new(AtomicBool::new(true)),
shutdown,
scheduler_handle: Arc::new(tokio::sync::Mutex::new(None)),
};
queue.start_scheduler_task();
Ok(queue)
}
pub async fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
let mut handle_guard = self.scheduler_handle.lock().await;
if let Some(handle) = handle_guard.take() {
match tokio::time::timeout(tokio::time::Duration::from_secs(5), handle).await {
Ok(_) => tracing::debug!("Redis job queue scheduler stopped cleanly"),
Err(_) => tracing::warn!("Redis job queue scheduler did not stop within timeout"),
}
}
}
pub async fn ping(&self) -> bool {
match self.get_connection().await {
Ok(mut conn) => {
let result: redis::RedisResult<String> =
redis::cmd("PING").query_async(&mut conn).await;
let healthy = result.is_ok();
self.health_status.store(healthy, Ordering::Release);
healthy
}
Err(e) => {
tracing::warn!("Redis job queue ping failed: {}", e);
self.health_status.store(false, Ordering::Release);
false
}
}
}
async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
self.client
.get_multiplexed_async_connection()
.await
.map_err(|e| TidewayError::internal(format!("Failed to get Redis connection: {}", e)))
}
fn start_scheduler_task(&self) {
let client = self.client.clone();
let worker_id = self.worker_id.clone();
let shutdown = self.shutdown.clone();
let scheduler_handle = self.scheduler_handle.clone();
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
if shutdown.load(Ordering::Acquire) {
tracing::debug!("Redis job queue scheduler shutting down");
break;
}
interval.tick().await;
if shutdown.load(Ordering::Acquire) {
break;
}
if let Ok(mut conn) = client.get_multiplexed_async_connection().await {
let heartbeat_key = worker_heartbeat_key(&worker_id);
let heartbeat_update: redis::RedisResult<()> = redis::pipe()
.cmd("SADD")
.arg(WORKERS_KEY)
.arg(&worker_id)
.ignore()
.cmd("SETEX")
.arg(&heartbeat_key)
.arg(WORKER_HEARTBEAT_TTL_SECONDS)
.arg("1")
.ignore()
.query_async(&mut conn)
.await;
if let Err(error) = heartbeat_update {
tracing::warn!(
error = %error,
worker_id = %worker_id,
"Failed to update Redis job worker heartbeat"
);
continue;
}
let reclaimed: redis::RedisResult<i64> =
redis::Script::new(RECLAIM_STALE_WORKERS_LUA)
.key(WORKERS_KEY)
.key(PENDING_JOBS_KEY)
.arg(WORKER_HEARTBEAT_KEY_PREFIX)
.arg(PROCESSING_KEY_PREFIX)
.invoke_async(&mut conn)
.await;
match reclaimed {
Ok(count) if count > 0 => {
tracing::warn!(
reclaimed_jobs = count,
worker_id = %worker_id,
"Reclaimed orphaned jobs from stale workers"
);
}
Ok(_) => {}
Err(error) => {
tracing::warn!(
error = %error,
worker_id = %worker_id,
"Failed to reclaim jobs from stale workers"
);
}
}
let now = Utc::now().timestamp();
let moved: redis::RedisResult<i64> = redis::Script::new(MOVE_DUE_JOBS_LUA)
.key(SCHEDULED_JOBS_KEY)
.key(PENDING_JOBS_KEY)
.arg(now)
.invoke_async(&mut conn)
.await;
if let Err(error) = moved {
tracing::warn!(
error = %error,
"Failed to move due scheduled jobs to pending queue"
);
}
}
}
});
if let Ok(mut guard) = scheduler_handle.try_lock() {
*guard = Some(handle);
} else {
handle.abort();
tracing::error!("Failed to store scheduler handle - this should not happen");
}
}
}
#[async_trait]
impl JobQueue for RedisJobQueue {
async fn enqueue(&self, job: &dyn Job) -> Result<String> {
let job_id = Uuid::new_v4().to_string();
let payload = job.serialize()?;
let job_data = JobData::new(
job_id.clone(),
job.job_type().to_string(),
payload,
self.max_retries,
);
let job_json = serde_json::to_string(&job_data)
.map_err(|e| TidewayError::internal(format!("Failed to serialize job: {}", e)))?;
let mut conn = self.get_connection().await?;
redis::cmd("LPUSH")
.arg(PENDING_JOBS_KEY)
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to enqueue job: {}", e)))?;
Ok(job_id)
}
async fn dequeue(&self) -> Result<Option<JobData>> {
let mut conn = self.get_connection().await?;
let processing_key = processing_key(&self.worker_id);
let result: Option<String> = redis::cmd("BRPOPLPUSH")
.arg(PENDING_JOBS_KEY)
.arg(&processing_key)
.arg(5) .query_async(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to dequeue job: {}", e)))?;
if let Some(job_json) = result {
let job_data: JobData = serde_json::from_str(&job_json)
.map_err(|e| TidewayError::internal(format!("Failed to deserialize job: {}", e)))?;
Ok(Some(job_data))
} else {
Ok(None)
}
}
async fn complete(&self, job_id: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let processing_key = processing_key(&self.worker_id);
let jobs: Vec<String> = redis::cmd("LRANGE")
.arg(&processing_key)
.arg(0)
.arg(-1)
.query_async(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!("Failed to list processing jobs: {}", e))
})?;
for job_json in jobs {
if let Ok(job_data) = serde_json::from_str::<JobData>(&job_json) {
if job_data.job_id == job_id {
redis::cmd("LREM")
.arg(&processing_key)
.arg(1)
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!(
"Failed to remove job from processing: {}",
e
))
})?;
return Ok(());
}
}
}
Ok(())
}
async fn fail(&self, job_id: &str, _error: String) -> Result<()> {
let mut conn = self.get_connection().await?;
let processing_key = processing_key(&self.worker_id);
let jobs: Vec<String> = redis::cmd("LRANGE")
.arg(&processing_key)
.arg(0)
.arg(-1)
.query_async(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!("Failed to list processing jobs: {}", e))
})?;
for job_json in jobs {
if let Ok(mut job_data) = serde_json::from_str::<JobData>(&job_json) {
if job_data.job_id == job_id {
redis::cmd("LREM")
.arg(&processing_key)
.arg(1)
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!(
"Failed to remove job from processing: {}",
e
))
})?;
if job_data.should_retry() {
let backoff_seconds =
self.retry_backoff_seconds * (2_u64.pow(job_data.retry_count));
let retry_at = Utc::now() + Duration::seconds(backoff_seconds as i64);
job_data.increment_retry();
let retry_json = serde_json::to_string(&job_data).map_err(|e| {
TidewayError::internal(format!(
"Failed to serialize job for retry: {}",
e
))
})?;
redis::cmd("ZADD")
.arg(SCHEDULED_JOBS_KEY)
.arg(retry_at.timestamp())
.arg(&retry_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!("Failed to schedule retry: {}", e))
})?;
} else {
redis::cmd("LPUSH")
.arg("jobs:failed")
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!(
"Failed to add to failed list: {}",
e
))
})?;
}
return Ok(());
}
}
}
Ok(())
}
async fn retry(&self, job_id: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let processing_key = processing_key(&self.worker_id);
let jobs: Vec<String> = redis::cmd("LRANGE")
.arg(&processing_key)
.arg(0)
.arg(-1)
.query_async(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!("Failed to list processing jobs: {}", e))
})?;
for job_json in jobs {
if let Ok(mut job_data) = serde_json::from_str::<JobData>(&job_json) {
if job_data.job_id == job_id {
redis::cmd("LREM")
.arg(&processing_key)
.arg(1)
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!(
"Failed to remove job from processing: {}",
e
))
})?;
if job_data.should_retry() {
job_data.increment_retry();
let retry_json = serde_json::to_string(&job_data).map_err(|e| {
TidewayError::internal(format!(
"Failed to serialize job for retry: {}",
e
))
})?;
redis::cmd("LPUSH")
.arg(PENDING_JOBS_KEY)
.arg(&retry_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!("Failed to retry job: {}", e))
})?;
} else {
redis::cmd("LPUSH")
.arg("jobs:failed")
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| {
TidewayError::internal(format!(
"Failed to add to failed list: {}",
e
))
})?;
}
return Ok(());
}
}
}
Ok(())
}
#[cfg(feature = "jobs")]
async fn schedule(&self, job: &dyn Job, run_at: DateTime<Utc>) -> Result<String> {
let job_id = Uuid::new_v4().to_string();
let payload = job.serialize()?;
let job_data = JobData::scheduled(
job_id.clone(),
job.job_type().to_string(),
payload,
self.max_retries,
run_at,
);
let job_json = serde_json::to_string(&job_data)
.map_err(|e| TidewayError::internal(format!("Failed to serialize job: {}", e)))?;
let mut conn = self.get_connection().await?;
redis::cmd("ZADD")
.arg(SCHEDULED_JOBS_KEY)
.arg(run_at.timestamp())
.arg(&job_json)
.query_async::<()>(&mut conn)
.await
.map_err(|e| TidewayError::internal(format!("Failed to schedule job: {}", e)))?;
Ok(job_id)
}
fn is_healthy(&self) -> bool {
self.health_status.load(Ordering::Acquire)
}
}