use redis::{Client, Commands, IntoConnectionInfo, RedisResult, Script};
use crate::types::Backend;
use crate::Error;
#[derive(Debug, Clone)]
pub struct Redis {
client: Client,
}
impl Redis {
pub fn new<T: IntoConnectionInfo>(connection_params: T) -> Self {
let client = Client::open(connection_params).expect("Failed to create Redis client");
Self { client }
}
pub fn new_with_client(client: Client) -> Self {
Self { client }
}
fn key_waiting(&self, queue: &str) -> String {
format!("{}:waiting", queue)
}
fn key_delayed(&self, queue: &str) -> String {
format!("{}:delayed", queue)
}
fn key_active(&self, queue: &str) -> String {
format!("{}:active", queue)
}
fn key_storage(&self, queue: &str) -> String {
format!("{}:storage", queue)
}
fn key_lock(&self, job_id: &str) -> String {
format!("aj:lock:{}", job_id)
}
}
const LUA_DELAYED_MOVE_READY: &str = r#"
local delayed_key = KEYS[1]
local waiting_key = KEYS[2]
local now_ms = tonumber(ARGV[1])
local ready = redis.call('ZRANGEBYSCORE', delayed_key, '-inf', now_ms)
local count = 0
for i, job_id in ipairs(ready) do
redis.call('ZREM', delayed_key, job_id)
redis.call('RPUSH', waiting_key, job_id)
count = count + 1
end
return count
"#;
const LUA_CLAIM_JOB: &str = r#"
local waiting_key = KEYS[1]
local active_key = KEYS[2]
local worker_id = ARGV[1]
local lock_ttl = tonumber(ARGV[2])
local job_id = redis.call('LPOP', waiting_key)
if not job_id then
return nil
end
local lock_key = 'aj:lock:' .. job_id
local acquired = redis.call('SET', lock_key, worker_id, 'NX', 'PX', lock_ttl)
if acquired then
redis.call('RPUSH', active_key, job_id)
return job_id
else
-- Failed to acquire lock, put job back
redis.call('LPUSH', waiting_key, job_id)
return nil
end
"#;
const LUA_LOCK_RELEASE: &str = r#"
local lock_key = KEYS[1]
local worker_id = ARGV[1]
if redis.call('GET', lock_key) == worker_id then
return redis.call('DEL', lock_key)
end
return 0
"#;
const LUA_LOCK_EXTEND: &str = r#"
local lock_key = KEYS[1]
local worker_id = ARGV[1]
local ttl_ms = tonumber(ARGV[2])
if redis.call('GET', lock_key) == worker_id then
return redis.call('PEXPIRE', lock_key, ttl_ms)
end
return 0
"#;
const LUA_REQUEUE_ORPHANED: &str = r#"
local active_key = KEYS[1]
local waiting_key = KEYS[2]
local orphaned = {}
local job_ids = redis.call('LRANGE', active_key, 0, -1)
for i, job_id in ipairs(job_ids) do
local lock_key = 'aj:lock:' .. job_id
if redis.call('EXISTS', lock_key) == 0 then
redis.call('LREM', active_key, 1, job_id)
redis.call('RPUSH', waiting_key, job_id)
table.insert(orphaned, job_id)
end
end
return orphaned
"#;
impl Backend for Redis {
fn waiting_push(&self, queue: &str, job_id: &str) -> Result<(), Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_waiting(queue);
conn.rpush::<_, _, ()>(&key, job_id)?;
Ok(())
}
fn waiting_pop(&self, queue: &str) -> Result<Option<String>, Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_waiting(queue);
let result: Option<String> = conn.lpop(&key, None)?;
Ok(result)
}
fn waiting_len(&self, queue: &str) -> Result<usize, Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_waiting(queue);
let len: usize = conn.llen(&key)?;
Ok(len)
}
fn delayed_push(&self, queue: &str, job_id: &str, run_at_ms: i64) -> Result<(), Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_delayed(queue);
conn.zadd::<_, _, _, ()>(&key, job_id, run_at_ms)?;
Ok(())
}
fn delayed_move_ready(&self, queue: &str, now_ms: i64) -> Result<usize, Error> {
let mut conn = self.client.get_connection()?;
let delayed_key = self.key_delayed(queue);
let waiting_key = self.key_waiting(queue);
let script = Script::new(LUA_DELAYED_MOVE_READY);
let count: usize = script
.key(&delayed_key)
.key(&waiting_key)
.arg(now_ms)
.invoke(&mut conn)?;
Ok(count)
}
fn delayed_remove(&self, queue: &str, job_id: &str) -> Result<(), Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_delayed(queue);
conn.zrem::<_, _, ()>(&key, job_id)?;
Ok(())
}
fn delayed_len(&self, queue: &str) -> Result<usize, Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_delayed(queue);
let len: usize = conn.zcard(&key)?;
Ok(len)
}
fn active_push(&self, queue: &str, job_id: &str) -> Result<(), Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_active(queue);
conn.rpush::<_, _, ()>(&key, job_id)?;
Ok(())
}
fn active_remove(&self, queue: &str, job_id: &str) -> Result<(), Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_active(queue);
conn.lrem::<_, _, ()>(&key, 1, job_id)?;
Ok(())
}
fn active_len(&self, queue: &str) -> Result<usize, Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_active(queue);
let len: usize = conn.llen(&key)?;
Ok(len)
}
fn active_list(&self, queue: &str) -> Result<Vec<String>, Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_active(queue);
let jobs: Vec<String> = conn.lrange(&key, 0, -1)?;
Ok(jobs)
}
fn job_save(&self, queue: &str, job_id: &str, data: &str) -> Result<(), Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_storage(queue);
conn.hset::<_, _, _, ()>(&key, job_id, data)?;
Ok(())
}
fn job_get(&self, queue: &str, job_id: &str) -> Result<Option<String>, Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_storage(queue);
let data: Option<String> = conn.hget(&key, job_id)?;
Ok(data)
}
fn job_delete(&self, queue: &str, job_id: &str) -> Result<(), Error> {
let mut conn = self.client.get_connection()?;
let key = self.key_storage(queue);
conn.hdel::<_, _, ()>(&key, job_id)?;
Ok(())
}
fn lock_acquire(&self, job_id: &str, worker_id: &str, ttl_ms: u64) -> Result<bool, Error> {
let mut conn = self.client.get_connection()?;
let lock_key = self.key_lock(job_id);
let result: RedisResult<Option<String>> = redis::cmd("SET")
.arg(&lock_key)
.arg(worker_id)
.arg("NX")
.arg("PX")
.arg(ttl_ms)
.query(&mut conn);
match result {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(e) => Err(e.into()),
}
}
fn lock_release(&self, job_id: &str, worker_id: &str) -> Result<bool, Error> {
let mut conn = self.client.get_connection()?;
let lock_key = self.key_lock(job_id);
let script = Script::new(LUA_LOCK_RELEASE);
let result: i32 = script.key(&lock_key).arg(worker_id).invoke(&mut conn)?;
Ok(result == 1)
}
fn lock_extend(&self, job_id: &str, worker_id: &str, ttl_ms: u64) -> Result<bool, Error> {
let mut conn = self.client.get_connection()?;
let lock_key = self.key_lock(job_id);
let script = Script::new(LUA_LOCK_EXTEND);
let result: i32 = script
.key(&lock_key)
.arg(worker_id)
.arg(ttl_ms)
.invoke(&mut conn)?;
Ok(result == 1)
}
fn claim_job(
&self,
queue: &str,
worker_id: &str,
lock_ttl_ms: u64,
) -> Result<Option<String>, Error> {
let mut conn = self.client.get_connection()?;
let waiting_key = self.key_waiting(queue);
let active_key = self.key_active(queue);
let script = Script::new(LUA_CLAIM_JOB);
let result: Option<String> = script
.key(&waiting_key)
.key(&active_key)
.arg(worker_id)
.arg(lock_ttl_ms)
.invoke(&mut conn)?;
Ok(result)
}
fn complete_job(&self, queue: &str, job_id: &str, worker_id: &str) -> Result<bool, Error> {
self.active_remove(queue, job_id)?;
self.lock_release(job_id, worker_id)?;
Ok(true)
}
fn fail_job(&self, queue: &str, job_id: &str, worker_id: &str) -> Result<bool, Error> {
self.active_remove(queue, job_id)?;
self.lock_release(job_id, worker_id)?;
Ok(true)
}
fn requeue_orphaned(&self, queue: &str) -> Result<Vec<String>, Error> {
let mut conn = self.client.get_connection()?;
let active_key = self.key_active(queue);
let waiting_key = self.key_waiting(queue);
let script = Script::new(LUA_REQUEUE_ORPHANED);
let orphaned: Vec<String> = script
.key(&active_key)
.key(&waiting_key)
.invoke(&mut conn)?;
Ok(orphaned)
}
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
fn test_redis() -> Redis {
Redis::new("redis://localhost:6379/")
}
fn unique_queue() -> String {
format!("test:{}", Uuid::new_v4())
}
fn cleanup(redis: &Redis, queue: &str) {
let mut conn = redis.client.get_connection().unwrap();
let _: () = redis::cmd("DEL")
.arg(redis.key_waiting(queue))
.arg(redis.key_delayed(queue))
.arg(redis.key_active(queue))
.arg(redis.key_storage(queue))
.query(&mut conn)
.unwrap();
}
#[test]
fn test_waiting_queue() {
let redis = test_redis();
let queue = unique_queue();
redis.waiting_push(&queue, "job1").unwrap();
redis.waiting_push(&queue, "job2").unwrap();
assert_eq!(redis.waiting_len(&queue).unwrap(), 2);
assert_eq!(redis.waiting_pop(&queue).unwrap(), Some("job1".to_string()));
assert_eq!(redis.waiting_pop(&queue).unwrap(), Some("job2".to_string()));
assert_eq!(redis.waiting_pop(&queue).unwrap(), None);
cleanup(&redis, &queue);
}
#[test]
fn test_delayed_queue() {
let redis = test_redis();
let queue = unique_queue();
redis.delayed_push(&queue, "job1", 1000).unwrap();
redis.delayed_push(&queue, "job2", 2000).unwrap();
redis.delayed_push(&queue, "job3", 3000).unwrap();
assert_eq!(redis.delayed_len(&queue).unwrap(), 3);
let moved = redis.delayed_move_ready(&queue, 2500).unwrap();
assert_eq!(moved, 2);
assert_eq!(redis.delayed_len(&queue).unwrap(), 1);
assert_eq!(redis.waiting_len(&queue).unwrap(), 2);
assert_eq!(redis.waiting_pop(&queue).unwrap(), Some("job1".to_string()));
assert_eq!(redis.waiting_pop(&queue).unwrap(), Some("job2".to_string()));
cleanup(&redis, &queue);
}
#[test]
fn test_claim_job() {
let redis = test_redis();
let queue = unique_queue();
redis.waiting_push(&queue, "job1").unwrap();
redis.waiting_push(&queue, "job2").unwrap();
let job = redis.claim_job(&queue, "worker1", 30000).unwrap();
assert_eq!(job, Some("job1".to_string()));
assert_eq!(redis.waiting_len(&queue).unwrap(), 1);
assert_eq!(redis.active_len(&queue).unwrap(), 1);
let mut conn = redis.client.get_connection().unwrap();
let lock_value: Option<String> = conn.get(redis.key_lock("job1")).unwrap();
assert_eq!(lock_value, Some("worker1".to_string()));
cleanup(&redis, &queue);
let _: () = conn.del(redis.key_lock("job1")).unwrap();
}
#[test]
fn test_lock_operations() {
let redis = test_redis();
let job_id = format!("job:{}", Uuid::new_v4());
assert!(redis.lock_acquire(&job_id, "worker1", 30000).unwrap());
assert!(!redis.lock_acquire(&job_id, "worker2", 30000).unwrap());
assert!(redis.lock_extend(&job_id, "worker1", 60000).unwrap());
assert!(!redis.lock_extend(&job_id, "worker2", 60000).unwrap());
assert!(!redis.lock_release(&job_id, "worker2").unwrap());
assert!(redis.lock_release(&job_id, "worker1").unwrap());
assert!(redis.lock_acquire(&job_id, "worker2", 30000).unwrap());
redis.lock_release(&job_id, "worker2").unwrap();
}
#[test]
fn test_requeue_orphaned() {
let redis = test_redis();
let queue = unique_queue();
redis.active_push(&queue, "job1").unwrap();
redis.active_push(&queue, "job2").unwrap();
redis.lock_acquire("job1", "worker1", 30000).unwrap();
let orphaned = redis.requeue_orphaned(&queue).unwrap();
assert_eq!(orphaned, vec!["job2".to_string()]);
assert_eq!(redis.active_len(&queue).unwrap(), 1);
assert_eq!(redis.waiting_len(&queue).unwrap(), 1);
assert_eq!(redis.waiting_pop(&queue).unwrap(), Some("job2".to_string()));
cleanup(&redis, &queue);
redis.lock_release("job1", "worker1").unwrap();
}
#[test]
fn test_job_storage() {
let redis = test_redis();
let queue = unique_queue();
redis.job_save(&queue, "job1", r#"{"data": 1}"#).unwrap();
let data = redis.job_get(&queue, "job1").unwrap();
assert_eq!(data, Some(r#"{"data": 1}"#.to_string()));
redis.job_delete(&queue, "job1").unwrap();
assert_eq!(redis.job_get(&queue, "job1").unwrap(), None);
cleanup(&redis, &queue);
}
}