use async_trait::async_trait;
use chrono::{DateTime, Utc};
use redis::{AsyncCommands, Client, RedisResult, aio::ConnectionManager};
use serde_json;
use std::collections::HashMap;
use tokio::time::timeout;
use super::{MonitoringApi, RedisConfig, Storage, StorageError};
use crate::core::{Job, JobState, JobStateKind, RecurringJob, ServerInfo};
pub struct RedisStorage {
connection_manager: ConnectionManager,
config: RedisConfig,
}
impl RedisStorage {
pub async fn with_config(config: RedisConfig) -> Result<Self, StorageError> {
let client = Client::open(config.full_url()).map_err(|e| {
StorageError::connection_with_source("Failed to create Redis client", Box::new(e))
})?;
let connection_manager = timeout(config.connection_timeout, ConnectionManager::new(client))
.await
.map_err(|_| StorageError::timeout(config.connection_timeout.as_millis() as u64))?
.map_err(|e| {
StorageError::connection_with_source(
"Failed to create connection manager",
Box::new(e),
)
})?;
Ok(Self {
connection_manager,
config,
})
}
async fn get_connection(&self) -> Result<ConnectionManager, StorageError> {
timeout(self.config.connection_timeout, async {
Ok(self.connection_manager.clone())
})
.await
.map_err(|_| StorageError::timeout(self.config.connection_timeout.as_millis() as u64))?
}
async fn with_timeout<F, T>(&self, operation: F) -> Result<T, StorageError>
where
F: std::future::Future<Output = RedisResult<T>>,
{
timeout(self.config.command_timeout, operation)
.await
.map_err(|_| StorageError::timeout(self.config.command_timeout.as_millis() as u64))?
.map_err(|e| {
StorageError::operation_failed_with_source(
"Redis command",
e.to_string(),
Box::new(e),
)
})
}
fn job_key(&self, job_id: &str) -> String {
format!("{}:jobs:{}", self.config.key_prefix, job_id)
}
fn state_index_key(&self, state: &str) -> String {
format!("{}:state:{}", self.config.key_prefix, state)
}
fn available_jobs_key(&self) -> String {
format!("{}:available", self.config.key_prefix)
}
fn job_counts_key(&self) -> String {
format!("{}:counts", self.config.key_prefix)
}
fn all_jobs_key(&self) -> String {
format!("{}:all", self.config.key_prefix)
}
fn recurring_key(&self, id: &str) -> String {
format!("{}:recurring:{}", self.config.key_prefix, id)
}
fn recurring_index_key(&self) -> String {
format!("{}:recurring:index", self.config.key_prefix)
}
fn server_key(&self, server_id: &str) -> String {
format!("{}:server:{}", self.config.key_prefix, server_id)
}
fn servers_index_key(&self) -> String {
format!("{}:servers", self.config.key_prefix)
}
fn named_lock_key(&self, resource: &str) -> String {
format!("{}:lock:{}", self.config.key_prefix, resource)
}
fn state_to_string(state: &JobState) -> String {
match state {
JobState::Enqueued { .. } => "enqueued".to_string(),
JobState::Processing { .. } => "processing".to_string(),
JobState::Succeeded { .. } => "succeeded".to_string(),
JobState::Failed { .. } => "failed".to_string(),
JobState::Deleted { .. } => "deleted".to_string(),
JobState::Scheduled { .. } => "scheduled".to_string(),
JobState::AwaitingRetry { .. } => "awaiting_retry".to_string(),
}
}
fn is_job_available(job: &Job) -> bool {
let now = Utc::now();
match &job.state {
JobState::Enqueued { .. } => true,
JobState::Scheduled { enqueue_at, .. } => *enqueue_at <= now,
JobState::AwaitingRetry { retry_at, .. } => *retry_at <= now,
_ => false,
}
}
async fn update_job_indices(
&self,
job: &Job,
old_state: Option<&JobState>,
) -> Result<(), StorageError> {
let mut conn = self.get_connection().await?;
let state_str = Self::state_to_string(&job.state);
let state_key = self.state_index_key(&state_str);
let all_jobs_key = self.all_jobs_key();
let available_key = self.available_jobs_key();
let counts_key = self.job_counts_key();
if let Some(old_state) = old_state {
let old_state_str = Self::state_to_string(old_state);
let old_state_key = self.state_index_key(&old_state_str);
self.with_timeout::<_, ()>(conn.srem(&old_state_key, &job.id))
.await?;
self.with_timeout::<_, ()>(conn.hincr(&counts_key, &old_state_str, -1))
.await?;
}
self.with_timeout::<_, ()>(conn.sadd(&state_key, &job.id))
.await?;
self.with_timeout::<_, ()>(conn.sadd(&all_jobs_key, &job.id))
.await?;
self.with_timeout::<_, ()>(conn.hincr(&counts_key, &state_str, 1))
.await?;
if Self::is_job_available(job) {
let score =
job.priority as f64 + (job.created_at.timestamp_millis() as f64 / 1_000_000.0);
self.with_timeout::<_, ()>(conn.zadd(&available_key, &job.id, score))
.await?;
} else {
self.with_timeout::<_, ()>(conn.zrem(&available_key, &job.id))
.await?;
}
match job.state {
JobState::Succeeded { .. } => {
if let Some(ttl) = self.config.completed_job_ttl {
let job_key = self.job_key(&job.id);
self.with_timeout::<_, ()>(conn.expire(&job_key, ttl.as_secs() as i64))
.await?;
}
}
JobState::Failed { .. } => {
if let Some(ttl) = self.config.failed_job_ttl {
let job_key = self.job_key(&job.id);
self.with_timeout::<_, ()>(conn.expire(&job_key, ttl.as_secs() as i64))
.await?;
}
}
_ => {}
}
Ok(())
}
async fn remove_job_indices(&self, job_id: &str, job: &Job) -> Result<(), StorageError> {
let mut conn = self.get_connection().await?;
let state_str = Self::state_to_string(&job.state);
let state_key = self.state_index_key(&state_str);
let all_jobs_key = self.all_jobs_key();
let available_key = self.available_jobs_key();
let counts_key = self.job_counts_key();
self.with_timeout::<_, ()>(conn.srem(&state_key, job_id))
.await?;
self.with_timeout::<_, ()>(conn.srem(&all_jobs_key, job_id))
.await?;
self.with_timeout::<_, ()>(conn.zrem(&available_key, job_id))
.await?;
self.with_timeout::<_, ()>(conn.hincr(&counts_key, &state_str, -1))
.await?;
Ok(())
}
}
#[async_trait]
impl MonitoringApi for RedisStorage {
async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError> {
let mut conn = self.get_connection().await?;
let job_key = self.job_key(job_id);
let job_json: Option<String> = self.with_timeout(conn.get(&job_key)).await?;
match job_json {
Some(json) => {
let job: Job = serde_json::from_str(&json).map_err(|e| {
StorageError::serialization_with_source(
"Failed to deserialize job",
Box::new(e),
)
})?;
Ok(Some(job))
}
None => Ok(None),
}
}
async fn update(&self, job: &Job) -> Result<(), StorageError> {
let job_key = self.job_key(&job.id);
let current_job = self.get(&job.id).await?;
let old_state = current_job.as_ref().map(|j| &j.state);
if current_job.is_none() {
return Err(StorageError::job_not_found(job.id.clone()));
}
let mut conn = self.get_connection().await?;
let job_json = serde_json::to_string(job).map_err(|e| {
StorageError::serialization_with_source("Failed to serialize job", Box::new(e))
})?;
self.with_timeout::<_, ()>(conn.set(&job_key, job_json))
.await?;
self.update_job_indices(job, old_state).await?;
Ok(())
}
async fn delete(&self, job_id: &str) -> Result<bool, StorageError> {
let job = match self.get(job_id).await? {
Some(job) => job,
None => return Ok(false),
};
let mut conn = self.get_connection().await?;
let job_key = self.job_key(job_id);
let deleted: i32 = self.with_timeout(conn.del(&job_key)).await?;
if deleted > 0 {
self.remove_job_indices(job_id, &job).await?;
Ok(true)
} else {
Ok(false)
}
}
async fn list(
&self,
state_filter: Option<&JobState>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<Job>, StorageError> {
let mut conn = self.get_connection().await?;
let job_ids: Vec<String> = if let Some(state) = state_filter {
let state_str = Self::state_to_string(state);
let state_key = self.state_index_key(&state_str);
self.with_timeout(conn.smembers(&state_key)).await?
} else {
let all_jobs_key = self.all_jobs_key();
self.with_timeout(conn.smembers(&all_jobs_key)).await?
};
let mut jobs = Vec::new();
for job_id in job_ids {
if let Some(job) = self.get(&job_id).await? {
jobs.push(job);
}
}
jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
let start = offset.unwrap_or(0);
let end = if let Some(limit) = limit {
std::cmp::min(start + limit, jobs.len())
} else {
jobs.len()
};
if start >= jobs.len() {
Ok(vec![])
} else {
Ok(jobs[start..end].to_vec())
}
}
async fn get_job_counts(&self) -> Result<HashMap<JobStateKind, usize>, StorageError> {
let mut conn = self.get_connection().await?;
let counts_key = self.job_counts_key();
let raw_counts: HashMap<String, i32> = self.with_timeout(conn.hgetall(&counts_key)).await?;
let mut counts = HashMap::new();
for (state_str, count) in raw_counts {
if count > 0 {
let kind = match state_str.as_str() {
"enqueued" => JobStateKind::Enqueued,
"processing" => JobStateKind::Processing,
"succeeded" => JobStateKind::Succeeded,
"failed" => JobStateKind::Failed,
"deleted" => JobStateKind::Deleted,
"scheduled" => JobStateKind::Scheduled,
"awaiting_retry" => JobStateKind::AwaitingRetry,
_ => continue,
};
counts.insert(kind, count as usize);
}
}
Ok(counts)
}
}
#[async_trait]
impl Storage for RedisStorage {
async fn enqueue(&self, job: &Job) -> Result<(), StorageError> {
let mut conn = self.get_connection().await?;
let job_key = self.job_key(&job.id);
let job_json = serde_json::to_string(job).map_err(|e| {
StorageError::serialization_with_source("Failed to serialize job", Box::new(e))
})?;
self.with_timeout::<_, ()>(conn.set(&job_key, job_json))
.await?;
self.update_job_indices(job, None).await?;
Ok(())
}
async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError> {
let mut conn = self.get_connection().await?;
let available_key = self.available_jobs_key();
let count = limit.unwrap_or(-1_isize as usize) as isize;
let job_ids: Vec<String> = self
.with_timeout(conn.zrevrange(&available_key, 0, count - 1))
.await?;
let mut jobs = Vec::new();
for job_id in job_ids {
if let Some(job) = self.get(&job_id).await? {
if Self::is_job_available(&job) {
jobs.push(job);
if let Some(limit) = limit
&& jobs.len() >= limit
{
break;
}
}
}
}
Ok(jobs)
}
async fn fetch_due_scheduled_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<Job>, StorageError> {
let mut conn = self.get_connection().await?;
let state_key = self.state_index_key("scheduled");
let job_ids: Vec<String> = self.with_timeout(conn.smembers(&state_key)).await?;
let mut due = Vec::new();
for job_id in job_ids {
if let Some(job) = self.get(&job_id).await?
&& let JobState::Scheduled { enqueue_at, .. } = &job.state
&& *enqueue_at <= now
{
due.push(job);
}
}
due.sort_by(|a, b| {
b.priority
.cmp(&a.priority)
.then_with(|| a.created_at.cmp(&b.created_at))
});
due.truncate(limit);
Ok(due)
}
async fn fetch_due_retry_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<Job>, StorageError> {
let mut conn = self.get_connection().await?;
let state_key = self.state_index_key("awaiting_retry");
let job_ids: Vec<String> = self.with_timeout(conn.smembers(&state_key)).await?;
let mut due = Vec::new();
for job_id in job_ids {
if let Some(job) = self.get(&job_id).await?
&& let JobState::AwaitingRetry { retry_at, .. } = &job.state
&& *retry_at <= now
{
due.push(job);
}
}
due.sort_by(|a, b| {
b.priority
.cmp(&a.priority)
.then_with(|| a.created_at.cmp(&b.created_at))
});
due.truncate(limit);
Ok(due)
}
async fn requeue_stranded_jobs(
&self,
stale_before: DateTime<Utc>,
) -> Result<usize, StorageError> {
let mut conn = self.get_connection().await?;
let processing_key = self.state_index_key("processing");
let job_ids: Vec<String> = self.with_timeout(conn.smembers(&processing_key)).await?;
let mut recovered = 0;
for job_id in job_ids {
let Some(mut job) = self.get(&job_id).await? else {
continue;
};
let stale = matches!(
&job.state,
JobState::Processing { started_at, .. } if *started_at < stale_before
);
if !stale {
continue;
}
job.state = JobState::enqueued(&job.queue);
self.update(&job).await?;
recovered += 1;
}
Ok(recovered)
}
async fn fetch_and_lock_job(
&self,
worker_id: &str,
_queues: Option<&[String]>,
) -> Result<Option<Job>, StorageError> {
let mut conn = self.get_connection().await?;
let lua_script = r#"
local available_key = KEYS[1]
local worker_id = ARGV[1]
local current_time = tonumber(ARGV[2])
-- Get the job with highest priority (lowest score)
local job_ids = redis.call('ZRANGEBYSCORE', available_key, '-inf', '+inf', 'LIMIT', 0, 1)
if #job_ids == 0 then
return nil -- No jobs available
end
local job_id = job_ids[1]
local job_key = 'qml:job:' .. job_id
-- Get the job data
local job_data = redis.call('GET', job_key)
if not job_data then
-- Job was deleted, remove from available set
redis.call('ZREM', available_key, job_id)
return nil
end
-- Parse job to check if it's still available. JobState is
-- externally tagged by serde, so the variant surfaces as a single
-- key on job.state (e.g. Enqueued, AwaitingRetry). Any job whose
-- variant is not Enqueued or AwaitingRetry is not eligible.
local job = cjson.decode(job_data)
if not (job.state.Enqueued or job.state.AwaitingRetry) then
redis.call('ZREM', available_key, job_id)
return nil
end
-- Mark job as processing, matching the externally-tagged layout
-- so Rust can deserialize it back into JobState::Processing.
job.state = {
Processing = {
worker_id = worker_id,
started_at = current_time,
server_name = 'redis-storage'
}
}
job.updated_at = current_time
-- Update job in Redis
redis.call('SET', job_key, cjson.encode(job))
-- Remove from available jobs and update indices
redis.call('ZREM', available_key, job_id)
redis.call('SREM', 'qml:state:enqueued', job_id)
redis.call('SREM', 'qml:state:awaiting_retry', job_id)
redis.call('SADD', 'qml:state:processing', job_id)
-- Update counters
redis.call('HINCRBY', 'qml:counts', 'enqueued', -1)
redis.call('HINCRBY', 'qml:counts', 'awaiting_retry', -1)
redis.call('HINCRBY', 'qml:counts', 'processing', 1)
return job_data
"#;
let available_key = self.available_jobs_key();
let current_time = chrono::Utc::now().timestamp_millis();
let result: Option<String> = redis::Script::new(lua_script)
.key(&available_key)
.arg(worker_id)
.arg(current_time)
.invoke_async(&mut conn)
.await
.map_err(|e| StorageError::OperationError {
message: format!("Failed to fetch and lock job: {}", e),
})?;
if let Some(job_json) = result {
let job: Job = serde_json::from_str(&job_json).map_err(|e| {
StorageError::serialization_with_source("Failed to parse job", Box::new(e))
})?;
Ok(Some(job))
} else {
Ok(None)
}
}
async fn try_acquire_job_lock(
&self,
job_id: &str,
worker_id: &str,
timeout_seconds: u64,
) -> Result<bool, StorageError> {
let mut conn = self.get_connection().await?;
let lock_key = format!("qml:lock:{}", job_id);
let result: Option<String> = self
.with_timeout::<_, Option<String>>(
redis::cmd("SET")
.arg(&lock_key)
.arg(worker_id)
.arg("NX")
.arg("EX")
.arg(timeout_seconds)
.query_async(&mut conn),
)
.await?;
Ok(result.is_some())
}
async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError> {
let mut conn = self.get_connection().await?;
let lua_script = r#"
local lock_key = KEYS[1]
local worker_id = ARGV[1]
local current_owner = redis.call('GET', lock_key)
if current_owner == worker_id then
redis.call('DEL', lock_key)
return 1
else
return 0
end
"#;
let lock_key = format!("qml:lock:{}", job_id);
let result: i32 = redis::Script::new(lua_script)
.key(&lock_key)
.arg(worker_id)
.invoke_async(&mut conn)
.await
.map_err(|e| StorageError::OperationError {
message: format!("Failed to release job lock: {}", e),
})?;
Ok(result == 1)
}
async fn fetch_available_jobs_atomic(
&self,
worker_id: &str,
limit: Option<usize>,
_queues: Option<&[String]>,
) -> Result<Vec<Job>, StorageError> {
let mut jobs = Vec::new();
let fetch_limit = limit.unwrap_or(10).min(50);
for _ in 0..fetch_limit {
match self.fetch_and_lock_job(worker_id, _queues).await? {
Some(job) => jobs.push(job),
None => break, }
}
Ok(jobs)
}
async fn upsert_recurring_job(&self, job: &RecurringJob) -> Result<(), StorageError> {
let mut conn = self.get_connection().await?;
let key = self.recurring_key(&job.id);
let json = serde_json::to_string(job).map_err(|e| {
StorageError::serialization_with_source(
"Failed to serialize recurring job",
Box::new(e),
)
})?;
self.with_timeout::<_, ()>(conn.set(&key, json)).await?;
let score = job.next_run_at.timestamp_millis() as f64;
self.with_timeout::<_, ()>(conn.zadd(self.recurring_index_key(), &job.id, score))
.await?;
Ok(())
}
async fn remove_recurring_job(&self, id: &str) -> Result<bool, StorageError> {
let mut conn = self.get_connection().await?;
let key = self.recurring_key(id);
let removed: i32 = self.with_timeout::<_, i32>(conn.del(&key)).await?;
let _: i32 = self
.with_timeout::<_, i32>(conn.zrem(self.recurring_index_key(), id))
.await?;
Ok(removed > 0)
}
async fn list_recurring_jobs(&self) -> Result<Vec<RecurringJob>, StorageError> {
let mut conn = self.get_connection().await?;
let ids: Vec<String> = self
.with_timeout::<_, Vec<String>>(conn.zrange(self.recurring_index_key(), 0, -1))
.await?;
let mut out = Vec::with_capacity(ids.len());
for id in ids {
let key = self.recurring_key(&id);
let json: Option<String> = self
.with_timeout::<_, Option<String>>(conn.get(&key))
.await?;
if let Some(s) = json {
let r: RecurringJob = serde_json::from_str(&s).map_err(|e| {
StorageError::serialization_with_source(
"Failed to parse recurring job",
Box::new(e),
)
})?;
out.push(r);
}
}
out.sort_by(|a, b| a.id.cmp(&b.id));
Ok(out)
}
async fn fetch_due_recurring_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<RecurringJob>, StorageError> {
let mut conn = self.get_connection().await?;
let index = self.recurring_index_key();
let now_ms = now.timestamp_millis() as f64;
let ids: Vec<String> = self
.with_timeout::<_, Vec<String>>(conn.zrangebyscore_limit(
&index,
f64::NEG_INFINITY,
now_ms,
0,
limit as isize,
))
.await?;
let mut claimed = Vec::new();
let park_ms = (now + chrono::Duration::days(3650)).timestamp_millis() as f64;
for id in ids {
let claim_key = format!("{}:recurring:claim:{}", self.config.key_prefix, id);
let claimed_ok: Option<String> = self
.with_timeout::<_, Option<String>>(
redis::cmd("SET")
.arg(&claim_key)
.arg("1")
.arg("NX")
.arg("EX")
.arg(60i64)
.query_async(&mut conn),
)
.await?;
if claimed_ok.is_none() {
continue;
}
let key = self.recurring_key(&id);
let json: Option<String> = self
.with_timeout::<_, Option<String>>(conn.get(&key))
.await?;
let Some(s) = json else { continue };
let r: RecurringJob = serde_json::from_str(&s).map_err(|e| {
StorageError::serialization_with_source(
"Failed to parse recurring job",
Box::new(e),
)
})?;
if !r.enabled || r.next_run_at > now {
continue;
}
self.with_timeout::<_, ()>(conn.zadd(&index, &id, park_ms))
.await?;
claimed.push(r);
if claimed.len() >= limit {
break;
}
}
Ok(claimed)
}
async fn delete_expired_jobs(&self, now: DateTime<Utc>) -> Result<usize, StorageError> {
let mut conn = self.get_connection().await?;
let all_key = self.all_jobs_key();
let ids: Vec<String> = self
.with_timeout::<_, Vec<String>>(conn.smembers(&all_key))
.await?;
let mut removed = 0usize;
for id in ids {
let key = self.job_key(&id);
let json: Option<String> = self
.with_timeout::<_, Option<String>>(conn.get(&key))
.await?;
let Some(s) = json else { continue };
let job: Job = match serde_json::from_str(&s) {
Ok(j) => j,
Err(_) => continue,
};
let expired = match job.expires_at {
Some(ts) => ts < now,
None => false,
};
if !expired {
continue;
}
self.remove_job_indices(&id, &job).await?;
let _: i32 = self.with_timeout::<_, i32>(conn.del(&key)).await?;
removed += 1;
}
Ok(removed)
}
async fn register_server(&self, info: &ServerInfo) -> Result<(), StorageError> {
let mut conn = self.get_connection().await?;
let key = self.server_key(&info.server_id);
let index = self.servers_index_key();
let json = serde_json::to_string(info).map_err(|e| {
StorageError::serialization_with_source("Failed to serialize ServerInfo", Box::new(e))
})?;
self.with_timeout::<_, ()>(conn.set(&key, json)).await?;
self.with_timeout::<_, ()>(conn.sadd(&index, &info.server_id))
.await?;
Ok(())
}
async fn heartbeat_server(
&self,
server_id: &str,
now: DateTime<Utc>,
) -> Result<bool, StorageError> {
let mut conn = self.get_connection().await?;
let key = self.server_key(server_id);
let json: Option<String> = self
.with_timeout::<_, Option<String>>(conn.get(&key))
.await?;
let Some(s) = json else {
return Ok(false);
};
let mut info: ServerInfo = serde_json::from_str(&s).map_err(|e| {
StorageError::serialization_with_source("Failed to parse ServerInfo", Box::new(e))
})?;
info.last_heartbeat = now;
let updated = serde_json::to_string(&info).map_err(|e| {
StorageError::serialization_with_source("Failed to serialize ServerInfo", Box::new(e))
})?;
self.with_timeout::<_, ()>(conn.set(&key, updated)).await?;
Ok(true)
}
async fn deregister_server(&self, server_id: &str) -> Result<bool, StorageError> {
let mut conn = self.get_connection().await?;
let key = self.server_key(server_id);
let index = self.servers_index_key();
let deleted: i32 = self.with_timeout::<_, i32>(conn.del(&key)).await?;
let _: i32 = self
.with_timeout::<_, i32>(conn.srem(&index, server_id))
.await?;
Ok(deleted > 0)
}
async fn list_dead_servers(
&self,
stale_before: DateTime<Utc>,
) -> Result<Vec<ServerInfo>, StorageError> {
let mut conn = self.get_connection().await?;
let index = self.servers_index_key();
let ids: Vec<String> = self
.with_timeout::<_, Vec<String>>(conn.smembers(&index))
.await?;
let mut dead = Vec::new();
for id in ids {
let key = self.server_key(&id);
let json: Option<String> = self
.with_timeout::<_, Option<String>>(conn.get(&key))
.await?;
let Some(s) = json else {
let _: i32 = self.with_timeout::<_, i32>(conn.srem(&index, &id)).await?;
continue;
};
let info: ServerInfo = match serde_json::from_str(&s) {
Ok(i) => i,
Err(_) => continue,
};
if info.last_heartbeat < stale_before {
dead.push(info);
}
}
Ok(dead)
}
async fn reclaim_jobs_from_server(&self, server_id: &str) -> Result<usize, StorageError> {
let mut conn = self.get_connection().await?;
let processing_key = self.state_index_key("processing");
let job_ids: Vec<String> = self.with_timeout(conn.smembers(&processing_key)).await?;
let mut reclaimed = 0;
for job_id in job_ids {
let Some(mut job) = self.get(&job_id).await? else {
continue;
};
let owned = matches!(
&job.state,
JobState::Processing { server_name, .. } if server_name == server_id
);
if !owned {
continue;
}
job.state = JobState::enqueued(&job.queue);
self.update(&job).await?;
reclaimed += 1;
}
Ok(reclaimed)
}
async fn try_acquire_lock(
&self,
resource: &str,
owner: &str,
ttl: std::time::Duration,
) -> Result<bool, StorageError> {
let ttl_ms = ttl.as_millis() as u64;
let script = redis::Script::new(
r#"
local cur = redis.call('GET', KEYS[1])
if cur == false or cur == ARGV[1] then
redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])
return 1
else
return 0
end
"#,
);
let key = self.named_lock_key(resource);
let mut conn = self.get_connection().await?;
let acquired: i64 = self
.with_timeout(
script
.key(&key)
.arg(owner)
.arg(ttl_ms)
.invoke_async(&mut conn),
)
.await?;
Ok(acquired == 1)
}
async fn release_lock(&self, resource: &str, owner: &str) -> Result<bool, StorageError> {
let script = redis::Script::new(
r#"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"#,
);
let key = self.named_lock_key(resource);
let mut conn = self.get_connection().await?;
let deleted: i64 = self
.with_timeout(script.key(&key).arg(owner).invoke_async(&mut conn))
.await?;
Ok(deleted == 1)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::Job;
use chrono::Duration;
fn test_redis_config() -> RedisConfig {
RedisConfig::new()
.with_url("redis://127.0.0.1:6379")
.with_key_prefix("qml_test")
.with_database(1) }
async fn create_test_storage() -> Option<RedisStorage> {
RedisStorage::with_config(test_redis_config()).await.ok()
}
fn create_test_job() -> Job {
Job::new("test_job", serde_json::json!(["test_arg".to_string()]))
}
#[tokio::test]
async fn test_redis_storage_basic_operations() {
let storage = match create_test_storage().await {
Some(storage) => storage,
None => {
println!("Skipping Redis test - Redis not available");
return;
}
};
let job = create_test_job();
let _ = storage.delete(&job.id).await;
assert!(storage.enqueue(&job).await.is_ok());
let retrieved = storage.get(&job.id).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().id, job.id);
let mut updated_job = job.clone();
updated_job.state = JobState::processing("worker1", "server1");
assert!(storage.update(&updated_job).await.is_ok());
let retrieved = storage.get(&job.id).await.unwrap().unwrap();
assert!(matches!(retrieved.state, JobState::Processing { .. }));
let deleted = storage.delete(&job.id).await.unwrap();
assert!(deleted);
let retrieved = storage.get(&job.id).await.unwrap();
assert!(retrieved.is_none());
}
#[tokio::test]
async fn test_redis_storage_list_operations() {
let storage = match create_test_storage().await {
Some(storage) => storage,
None => {
println!("Skipping Redis test - Redis not available");
return;
}
};
let all_jobs = storage.list(None, None, None).await.unwrap();
for job in all_jobs {
let _ = storage.delete(&job.id).await;
}
let mut job1 = create_test_job();
job1.state = JobState::enqueued("default");
let mut job2 = create_test_job();
job2.state = JobState::processing("worker1", "server1");
let mut job3 = create_test_job();
job3.state = JobState::succeeded(100, None);
storage.enqueue(&job1).await.unwrap();
storage.enqueue(&job2).await.unwrap();
storage.enqueue(&job3).await.unwrap();
let all_jobs = storage.list(None, None, None).await.unwrap();
assert_eq!(all_jobs.len(), 3);
let enqueued_state = JobState::enqueued("test");
let enqueued_jobs = storage
.list(Some(&enqueued_state), None, None)
.await
.unwrap();
assert_eq!(enqueued_jobs.len(), 1);
storage.delete(&job1.id).await.unwrap();
storage.delete(&job2.id).await.unwrap();
storage.delete(&job3.id).await.unwrap();
}
#[tokio::test]
async fn test_redis_storage_available_jobs() {
let storage = match create_test_storage().await {
Some(storage) => storage,
None => {
println!("Skipping Redis test - Redis not available");
return;
}
};
let all_jobs = storage.list(None, None, None).await.unwrap();
for job in all_jobs {
let _ = storage.delete(&job.id).await;
}
let mut job1 = create_test_job();
job1.state = JobState::enqueued("default");
job1.priority = 10;
let mut job2 = create_test_job();
job2.state = JobState::scheduled(Utc::now() - Duration::hours(1), "delay");
job2.priority = 5;
let mut job3 = create_test_job();
job3.state = JobState::processing("worker1", "server1");
storage.enqueue(&job1).await.unwrap();
storage.enqueue(&job2).await.unwrap();
storage.enqueue(&job3).await.unwrap();
let available = storage.get_available_jobs(None).await.unwrap();
assert_eq!(available.len(), 2);
assert_eq!(available[0].priority, 10);
storage.delete(&job1.id).await.unwrap();
storage.delete(&job2.id).await.unwrap();
storage.delete(&job3.id).await.unwrap();
}
#[tokio::test]
async fn test_redis_storage_update_nonexistent() {
let storage = match create_test_storage().await {
Some(storage) => storage,
None => {
println!("Skipping Redis test - Redis not available");
return;
}
};
let job = create_test_job();
let result = storage.update(&job).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StorageError::JobNotFound { .. }
));
}
}