use crate::{Error, JobPayload, QueueConfig};
use chrono::{DateTime, Utc};
use redis::aio::ConnectionManager;
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::debug;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct QueueStats {
pub queues: Vec<SingleQueueStats>,
pub total_failed: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SingleQueueStats {
pub name: String,
pub pending: usize,
pub delayed: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobInfo {
pub id: String,
pub job_type: String,
pub queue: String,
pub attempts: u32,
pub max_retries: u32,
pub created_at: DateTime<Utc>,
pub available_at: DateTime<Utc>,
pub state: JobState,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JobState {
Pending,
Delayed,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailedJobInfo {
pub job: JobInfo,
pub error: String,
pub failed_at: DateTime<Utc>,
}
#[derive(Debug, Deserialize)]
struct StoredFailedJob {
payload: JobPayload,
error: String,
failed_at: DateTime<Utc>,
}
#[derive(Clone)]
pub struct QueueConnection {
conn: ConnectionManager,
config: Arc<QueueConfig>,
}
impl QueueConnection {
pub async fn new(config: QueueConfig) -> Result<Self, Error> {
let client = redis::Client::open(config.redis_url.as_str())
.map_err(|e| Error::ConnectionFailed(e.to_string()))?;
let conn = ConnectionManager::new(client)
.await
.map_err(|e| Error::ConnectionFailed(e.to_string()))?;
Ok(Self {
conn,
config: Arc::new(config),
})
}
pub fn config(&self) -> &QueueConfig {
&self.config
}
pub async fn push(&self, payload: JobPayload) -> Result<(), Error> {
let queue = &payload.queue;
let json = payload.to_json()?;
if payload.is_available() {
let key = self.config.queue_key(queue);
self.conn
.clone()
.lpush::<_, _, ()>(&key, &json)
.await
.map_err(Error::Redis)?;
debug!(queue = queue, job_id = %payload.id, "Job pushed to queue");
} else {
let key = self.config.delayed_key(queue);
let score = payload.available_at.timestamp() as f64;
self.conn
.clone()
.zadd::<_, _, _, ()>(&key, &json, score)
.await
.map_err(Error::Redis)?;
debug!(
queue = queue,
job_id = %payload.id,
available_at = %payload.available_at,
"Job pushed to delayed queue"
);
}
Ok(())
}
pub async fn pop(&self, queue: &str) -> Result<Option<JobPayload>, Error> {
let key = self.config.queue_key(queue);
let timeout = self.config.block_timeout.as_secs() as f64;
let result: Option<(String, String)> = self
.conn
.clone()
.brpop(&key, timeout)
.await
.map_err(Error::Redis)?;
match result {
Some((_, json)) => {
let mut payload = JobPayload::from_json(&json)?;
payload.reserve();
Ok(Some(payload))
}
None => Ok(None),
}
}
pub async fn pop_nowait(&self, queue: &str) -> Result<Option<JobPayload>, Error> {
let key = self.config.queue_key(queue);
let result: Option<String> = self
.conn
.clone()
.rpop(&key, None)
.await
.map_err(Error::Redis)?;
match result {
Some(json) => {
let mut payload = JobPayload::from_json(&json)?;
payload.reserve();
Ok(Some(payload))
}
None => Ok(None),
}
}
pub async fn migrate_delayed(&self, queue: &str) -> Result<usize, Error> {
let delayed_key = self.config.delayed_key(queue);
let queue_key = self.config.queue_key(queue);
let now = chrono::Utc::now().timestamp() as f64;
let ready_jobs: Vec<String> = self
.conn
.clone()
.zrangebyscore(&delayed_key, "-inf", now)
.await
.map_err(Error::Redis)?;
let count = ready_jobs.len();
for job in ready_jobs {
self.conn
.clone()
.zrem::<_, _, ()>(&delayed_key, &job)
.await
.map_err(Error::Redis)?;
self.conn
.clone()
.lpush::<_, _, ()>(&queue_key, &job)
.await
.map_err(Error::Redis)?;
}
if count > 0 {
debug!(queue = queue, count = count, "Migrated delayed jobs");
}
Ok(count)
}
pub async fn release(
&self,
mut payload: JobPayload,
delay: std::time::Duration,
) -> Result<(), Error> {
payload.increment_attempts();
payload.reserved_at = None;
if delay.is_zero() {
payload.available_at = chrono::Utc::now();
} else {
payload.available_at =
chrono::Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
}
self.push(payload).await
}
pub async fn fail(&self, payload: JobPayload, error: &Error) -> Result<(), Error> {
let failed_key = self.config.failed_key();
#[derive(serde::Serialize)]
struct FailedJob {
payload: JobPayload,
error: String,
failed_at: chrono::DateTime<chrono::Utc>,
}
let failed = FailedJob {
payload,
error: error.to_string(),
failed_at: chrono::Utc::now(),
};
let json = serde_json::to_string(&failed)
.map_err(|e| Error::SerializationFailed(e.to_string()))?;
self.conn
.clone()
.lpush::<_, _, ()>(&failed_key, &json)
.await
.map_err(Error::Redis)?;
Ok(())
}
pub async fn size(&self, queue: &str) -> Result<usize, Error> {
let key = self.config.queue_key(queue);
let len: usize = self.conn.clone().llen(&key).await.map_err(Error::Redis)?;
Ok(len)
}
pub async fn delayed_size(&self, queue: &str) -> Result<usize, Error> {
let key = self.config.delayed_key(queue);
let len: usize = self.conn.clone().zcard(&key).await.map_err(Error::Redis)?;
Ok(len)
}
pub async fn clear(&self, queue: &str) -> Result<(), Error> {
let queue_key = self.config.queue_key(queue);
let delayed_key = self.config.delayed_key(queue);
self.conn
.clone()
.del::<_, ()>(&queue_key)
.await
.map_err(Error::Redis)?;
self.conn
.clone()
.del::<_, ()>(&delayed_key)
.await
.map_err(Error::Redis)?;
Ok(())
}
pub async fn get_pending_jobs(&self, queue: &str, limit: usize) -> Result<Vec<JobInfo>, Error> {
let key = self.config.queue_key(queue);
let jobs: Vec<String> = self
.conn
.clone()
.lrange(&key, 0, limit as isize - 1)
.await
.map_err(Error::Redis)?;
let mut result = Vec::with_capacity(jobs.len());
for json in jobs {
if let Ok(payload) = JobPayload::from_json(&json) {
result.push(JobInfo {
id: payload.id.to_string(),
job_type: payload.job_type,
queue: payload.queue,
attempts: payload.attempts,
max_retries: payload.max_retries,
created_at: payload.created_at,
available_at: payload.available_at,
state: JobState::Pending,
});
}
}
Ok(result)
}
pub async fn get_delayed_jobs(&self, queue: &str, limit: usize) -> Result<Vec<JobInfo>, Error> {
let key = self.config.delayed_key(queue);
let jobs: Vec<String> = self
.conn
.clone()
.zrange(&key, 0, limit as isize - 1)
.await
.map_err(Error::Redis)?;
let mut result = Vec::with_capacity(jobs.len());
for json in jobs {
if let Ok(payload) = JobPayload::from_json(&json) {
result.push(JobInfo {
id: payload.id.to_string(),
job_type: payload.job_type,
queue: payload.queue,
attempts: payload.attempts,
max_retries: payload.max_retries,
created_at: payload.created_at,
available_at: payload.available_at,
state: JobState::Delayed,
});
}
}
Ok(result)
}
pub async fn get_failed_jobs(&self, limit: usize) -> Result<Vec<FailedJobInfo>, Error> {
let key = self.config.failed_key();
let jobs: Vec<String> = self
.conn
.clone()
.lrange(&key, 0, limit as isize - 1)
.await
.map_err(Error::Redis)?;
let mut result = Vec::with_capacity(jobs.len());
for json in jobs {
if let Ok(failed) = serde_json::from_str::<StoredFailedJob>(&json) {
result.push(FailedJobInfo {
job: JobInfo {
id: failed.payload.id.to_string(),
job_type: failed.payload.job_type,
queue: failed.payload.queue,
attempts: failed.payload.attempts,
max_retries: failed.payload.max_retries,
created_at: failed.payload.created_at,
available_at: failed.payload.available_at,
state: JobState::Failed,
},
error: failed.error,
failed_at: failed.failed_at,
});
}
}
Ok(result)
}
pub async fn failed_count(&self) -> Result<usize, Error> {
let key = self.config.failed_key();
let len: usize = self.conn.clone().llen(&key).await.map_err(Error::Redis)?;
Ok(len)
}
pub async fn get_stats(&self, queues: &[&str]) -> Result<QueueStats, Error> {
let mut stats = QueueStats::default();
for queue in queues {
let pending = self.size(queue).await?;
let delayed = self.delayed_size(queue).await?;
stats.queues.push(SingleQueueStats {
name: queue.to_string(),
pending,
delayed,
});
}
stats.total_failed = self.failed_count().await?;
Ok(stats)
}
}
pub struct Queue;
impl Queue {
pub fn connection() -> &'static QueueConnection {
GLOBAL_CONNECTION
.get()
.expect("Queue not initialized. Call Queue::init() first.")
}
pub async fn init(config: QueueConfig) -> Result<(), Error> {
let conn = QueueConnection::new(config).await?;
GLOBAL_CONNECTION
.set(conn)
.map_err(|_| Error::custom("Queue already initialized"))?;
Ok(())
}
pub fn is_initialized() -> bool {
GLOBAL_CONNECTION.get().is_some()
}
}
static GLOBAL_CONNECTION: std::sync::OnceLock<QueueConnection> = std::sync::OnceLock::new();
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_queue_stats_default() {
let stats = QueueStats::default();
assert!(stats.queues.is_empty());
assert_eq!(stats.total_failed, 0);
}
#[test]
fn test_queue_stats_serialization() {
let stats = QueueStats {
queues: vec![
SingleQueueStats {
name: "default".to_string(),
pending: 5,
delayed: 2,
},
SingleQueueStats {
name: "emails".to_string(),
pending: 10,
delayed: 0,
},
],
total_failed: 3,
};
let json = serde_json::to_string(&stats).unwrap();
let restored: QueueStats = serde_json::from_str(&json).unwrap();
assert_eq!(restored.queues.len(), 2);
assert_eq!(restored.queues[0].name, "default");
assert_eq!(restored.queues[0].pending, 5);
assert_eq!(restored.queues[1].name, "emails");
assert_eq!(restored.total_failed, 3);
}
#[test]
fn test_single_queue_stats_clone() {
let stats = SingleQueueStats {
name: "test".to_string(),
pending: 10,
delayed: 5,
};
let cloned = stats.clone();
assert_eq!(cloned.name, stats.name);
assert_eq!(cloned.pending, stats.pending);
assert_eq!(cloned.delayed, stats.delayed);
}
#[test]
fn test_job_state_serialization() {
assert_eq!(
serde_json::to_string(&JobState::Pending).unwrap(),
"\"pending\""
);
assert_eq!(
serde_json::to_string(&JobState::Delayed).unwrap(),
"\"delayed\""
);
assert_eq!(
serde_json::to_string(&JobState::Failed).unwrap(),
"\"failed\""
);
}
#[test]
fn test_job_state_deserialization() {
let pending: JobState = serde_json::from_str("\"pending\"").unwrap();
let delayed: JobState = serde_json::from_str("\"delayed\"").unwrap();
let failed: JobState = serde_json::from_str("\"failed\"").unwrap();
assert!(matches!(pending, JobState::Pending));
assert!(matches!(delayed, JobState::Delayed));
assert!(matches!(failed, JobState::Failed));
}
#[test]
fn test_job_info_serialization() {
let now = Utc::now();
let job_info = JobInfo {
id: "job-123".to_string(),
job_type: "SendEmailJob".to_string(),
queue: "emails".to_string(),
attempts: 2,
max_retries: 3,
created_at: now,
available_at: now,
state: JobState::Pending,
};
let json = serde_json::to_string(&job_info).unwrap();
let restored: JobInfo = serde_json::from_str(&json).unwrap();
assert_eq!(restored.id, "job-123");
assert_eq!(restored.job_type, "SendEmailJob");
assert_eq!(restored.queue, "emails");
assert_eq!(restored.attempts, 2);
assert_eq!(restored.max_retries, 3);
assert!(matches!(restored.state, JobState::Pending));
}
#[test]
fn test_job_info_clone() {
let now = Utc::now();
let job_info = JobInfo {
id: "job-456".to_string(),
job_type: "ProcessOrder".to_string(),
queue: "orders".to_string(),
attempts: 0,
max_retries: 5,
created_at: now,
available_at: now,
state: JobState::Delayed,
};
let cloned = job_info.clone();
assert_eq!(cloned.id, job_info.id);
assert_eq!(cloned.job_type, job_info.job_type);
}
#[test]
fn test_failed_job_info_serialization() {
let now = Utc::now();
let failed_job = FailedJobInfo {
job: JobInfo {
id: "job-789".to_string(),
job_type: "FailingJob".to_string(),
queue: "default".to_string(),
attempts: 3,
max_retries: 3,
created_at: now,
available_at: now,
state: JobState::Failed,
},
error: "Connection refused".to_string(),
failed_at: now,
};
let json = serde_json::to_string(&failed_job).unwrap();
let restored: FailedJobInfo = serde_json::from_str(&json).unwrap();
assert_eq!(restored.job.id, "job-789");
assert_eq!(restored.error, "Connection refused");
assert!(matches!(restored.job.state, JobState::Failed));
}
#[test]
fn test_failed_job_info_clone() {
let now = Utc::now();
let failed_job = FailedJobInfo {
job: JobInfo {
id: "job-999".to_string(),
job_type: "TestJob".to_string(),
queue: "test".to_string(),
attempts: 1,
max_retries: 3,
created_at: now,
available_at: now,
state: JobState::Failed,
},
error: "Test error".to_string(),
failed_at: now,
};
let cloned = failed_job.clone();
assert_eq!(cloned.job.id, failed_job.job.id);
assert_eq!(cloned.error, failed_job.error);
}
#[test]
fn test_job_state_debug() {
assert!(format!("{:?}", JobState::Pending).contains("Pending"));
assert!(format!("{:?}", JobState::Delayed).contains("Delayed"));
assert!(format!("{:?}", JobState::Failed).contains("Failed"));
}
}