use crate::{
storage::{Keys, PoolConfig, RedisClient, RedisMode},
config, Error, Result,
};
use crate::task::Task;
use chrono::Utc;
use fred::prelude::{RedisKey, RedisValue};
use rmp_serde;
type QueueCheckFn = (fn(&str) -> String, &'static str, bool);
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub redis_url: String,
pub redis_mode: RedisMode,
pub pool_config: PoolConfig,
pub default_queue: String,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
redis_url: "redis://localhost:6379".to_string(),
redis_mode: RedisMode::Standalone,
pool_config: PoolConfig::default(),
default_queue: "default".to_string(),
}
}
}
#[derive(Clone)]
pub struct Client {
redis: RedisClient,
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
pub async fn enqueue(&self, task: Task) -> Result<String> {
let task_id = task.id.clone();
let queue = task.queue.clone();
let unique_key = task.options.unique_key.clone();
let deps = task.options.depends_on.clone();
let priority = task.options.priority;
let task_data = rmp_serde::to_vec(&task)
.map_err(|e| Error::Serialization(e.to_string()))?;
let task_key: RedisKey = Keys::task(&task_id).into();
self.redis.hset(
task_key.clone(),
vec![
("data".into(), RedisValue::Bytes(task_data.into())),
("queue".into(), queue.as_str().into()),
],
).await?;
let task_ttl = config::get_task_ttl();
self.redis.expire(task_key, task_ttl).await?;
if let Some(deps) = deps {
if !deps.is_empty() {
let pending_deps_key: RedisKey = Keys::pending_deps(&task_id).into();
let dep_values: Vec<RedisValue> = deps.iter().map(|id| id.as_str().into()).collect();
for dep in &dep_values {
self.redis.sadd(pending_deps_key.clone(), dep.clone()).await?;
}
for dep_id in &deps {
let task_deps_key: RedisKey = Keys::task_deps(dep_id).into();
self.redis.sadd(task_deps_key, task_id.as_str().into()).await?;
}
tracing::debug!("Task {} registered with {} dependencies (not enqueued yet)", task_id, deps.len());
return Ok(task_id);
}
}
let queues_key: RedisKey = Keys::meta_queues().into();
self.redis.sadd(queues_key, queue.as_str().into()).await?;
if let Some(key) = unique_key {
let dedup_key: RedisKey = Keys::dedup(&queue).into();
let added = self.redis.dedup_add(dedup_key, key.clone().into()).await?;
if !added {
return Err(Error::Validation(format!(
"duplicate task detected: unique key '{}' already exists in queue '{}'",
key, queue
)));
}
}
if let Some(ref group) = task.options.group {
let now = chrono::Utc::now().timestamp();
let group_key: RedisKey = Keys::group(group).into();
self.redis.zadd(group_key, task_id.as_str().into(), now).await?;
let meta_group_key: RedisKey = Keys::meta_group(group).into();
self.redis.hincrby(meta_group_key.clone(), "count".into(), 1).await?;
tracing::debug!("Task {} added to group {} for aggregation", task_id, group);
return Ok(task_id);
}
const DEFAULT_PRIORITY: i32 = 50;
if priority != DEFAULT_PRIORITY {
let pqueue_key: RedisKey = Keys::priority_queue(&queue).into();
self.redis.zadd(pqueue_key, task_id.as_str().into(), priority as i64).await?;
tracing::debug!("Priority task enqueued: {} (priority: {}) to queue {}", task_id, priority, queue);
} else {
let queue_key: RedisKey = Keys::queue(&queue).into();
self.redis.rpush(queue_key, task_id.as_str().into()).await?;
tracing::debug!("Task enqueued: {} to queue {}", task_id, queue);
}
Ok(task_id)
}
pub async fn enqueue_delayed(&self, task: Task) -> Result<String> {
let task_id = task.id.clone();
let queue = task.queue.clone();
let delay = task.options.delay.ok_or(Error::Validation(
"Delay must be set for delayed tasks".into(),
))?;
let execute_at = Utc::now().timestamp().saturating_add(delay.as_secs() as i64);
let task_data = rmp_serde::to_vec(&task)
.map_err(|e| Error::Serialization(e.to_string()))?;
let task_key: RedisKey = Keys::task(&task_id).into();
self.redis.hset(
task_key.clone(),
vec![
("data".into(), RedisValue::Bytes(task_data.into())),
("queue".into(), queue.as_str().into()),
],
).await?;
let task_ttl = config::get_task_ttl();
self.redis.expire(task_key, task_ttl).await?;
let delayed_key: RedisKey = Keys::delayed(&queue).into();
self.redis.zadd(delayed_key, task_id.as_str().into(), execute_at).await?;
let queues_key: RedisKey = Keys::meta_queues().into();
self.redis.sadd(queues_key, queue.as_str().into()).await?;
tracing::debug!("Delayed task enqueued: {}", task_id);
Ok(task_id)
}
pub async fn enqueue_priority(&self, task: Task) -> Result<String> {
let task_id = task.id.clone();
let queue = task.queue.clone();
let priority = task.options.priority;
let unique_key = task.options.unique_key.clone();
let priority_range = config::get_priority_range();
if priority < priority_range.0 || priority > priority_range.1 {
return Err(Error::Validation(format!(
"Priority must be between {} and {}, got {}",
priority_range.0, priority_range.1, priority
)));
}
let task_data = rmp_serde::to_vec(&task)
.map_err(|e| Error::Serialization(e.to_string()))?;
let task_key: RedisKey = Keys::task(&task_id).into();
self.redis.hset(
task_key.clone(),
vec![
("data".into(), RedisValue::Bytes(task_data.into())),
("queue".into(), queue.as_str().into()),
],
).await?;
let task_ttl = config::get_task_ttl();
self.redis.expire(task_key, task_ttl).await?;
let pqueue_key: RedisKey = Keys::priority_queue(&queue).into();
self.redis.zadd(pqueue_key, task_id.as_str().into(), priority as i64).await?;
if let Some(key) = unique_key {
let dedup_key: RedisKey = Keys::dedup(&queue).into();
let added = self.redis.dedup_add(dedup_key, key.clone().into()).await?;
if !added {
return Err(Error::Validation(format!(
"duplicate task detected: unique key '{}' already exists in queue '{}'",
key, queue
)));
}
}
let queues_key: RedisKey = Keys::meta_queues().into();
self.redis.sadd(queues_key, queue.as_str().into()).await?;
tracing::debug!("Priority task enqueued: {} (priority: {})", task_id, priority);
Ok(task_id)
}
pub async fn enqueue_cron(&self, task: Task) -> Result<String> {
let task_id = task.id.clone();
let queue = task.queue.clone();
let cron_expr = task.options.cron.clone()
.ok_or(Error::Validation("Cron task must have cron expression".into()))?;
use cron::Schedule;
Schedule::try_from(cron_expr.as_str())
.map_err(|e| Error::Validation(format!("Invalid cron expression: {}", e)))?;
let schedule = Schedule::try_from(cron_expr.as_str())
.map_err(|e| Error::Validation(format!("Invalid cron expression: {}", e)))?;
let next_time = schedule.upcoming(chrono::Utc::now().timezone()).next()
.ok_or(Error::Validation("Could not calculate next scheduled time".into()))?
.timestamp();
let task_data = rmp_serde::to_vec(&task)
.map_err(|e| Error::Serialization(e.to_string()))?;
let task_key: RedisKey = Keys::task(&task_id).into();
self.redis.hset(
task_key.clone(),
vec![
("data".into(), RedisValue::Bytes(task_data.into())),
("queue".into(), queue.as_str().into()),
],
).await?;
let task_ttl = config::get_task_ttl();
self.redis.expire(task_key, task_ttl).await?;
let cron_key: RedisKey = Keys::cron_queue(&queue).into();
self.redis.zadd(cron_key, task_id.as_str().into(), next_time).await?;
let queues_key: RedisKey = Keys::meta_queues().into();
self.redis.sadd(queues_key, queue.as_str().into()).await?;
tracing::debug!("Cron task enqueued: {} (cron: {}, next: {})", task_id, cron_expr, next_time);
Ok(task_id)
}
pub async fn enqueue_batch(&self, tasks: Vec<Task>) -> Result<Vec<String>> {
if tasks.is_empty() {
return Ok(Vec::new());
}
let mut task_ids = Vec::with_capacity(tasks.len());
let mut registered_queues = std::collections::HashSet::new();
let task_ttl = config::get_task_ttl();
let mut pipeline = self.redis.pipeline();
for task in tasks {
let task_id = task.id.clone();
task_ids.push(task_id.clone());
let task_data = rmp_serde::to_vec(&task)
.map_err(|e| Error::Serialization(e.to_string()))?;
let task_key: RedisKey = Keys::task(&task_id).into();
pipeline = pipeline.set(task_key.clone(), RedisValue::Bytes(task_data.into()));
pipeline = pipeline.expire(task_key, task_ttl);
let queue_key: RedisKey = Keys::queue(&task.queue).into();
pipeline = pipeline.rpush(queue_key, task_id.as_str().into());
if let Some(key) = &task.options.unique_key {
let dedup_key: RedisKey = Keys::dedup(&task.queue).into();
pipeline = pipeline.sadd(dedup_key, key.as_str().into());
}
registered_queues.insert(task.queue.clone());
}
pipeline.execute().await?;
if !registered_queues.is_empty() {
let mut queues_pipeline = self.redis.pipeline();
let queues_key: RedisKey = Keys::meta_queues().into();
for queue in registered_queues {
queues_pipeline = queues_pipeline.sadd(queues_key.clone(), queue.as_str().into());
}
queues_pipeline.execute().await?;
}
tracing::debug!("Batch enqueued {} tasks", task_ids.len());
Ok(task_ids)
}
pub fn inspector(&self) -> super::Inspector {
super::Inspector::new(self.redis.clone())
}
pub async fn pause_queue(&self, queue_name: &str) -> Result<()> {
let pause_key: RedisKey = Keys::pause(queue_name).into();
self.redis.set(pause_key, RedisValue::Boolean(true)).await?;
tracing::info!("Queue '{}' paused", queue_name);
Ok(())
}
pub async fn resume_queue(&self, queue_name: &str) -> Result<()> {
let pause_key: RedisKey = Keys::pause(queue_name).into();
self.redis.del(vec![pause_key]).await?;
tracing::info!("Queue '{}' resumed", queue_name);
Ok(())
}
pub async fn flush_queue(&self, queue_name: &str) -> Result<u64> {
let queue_key: RedisKey = Keys::queue(queue_name).into();
let count = self.redis.llen(queue_key.clone()).await?;
self.redis.del(vec![queue_key]).await?;
tracing::info!("Queue '{}' flushed, {} tasks removed", queue_name, count);
Ok(count)
}
pub async fn cancel_task(&self, task_id: &str, queue_name: &str) -> Result<Option<String>> {
use crate::storage::Keys;
let queue_checks: Vec<QueueCheckFn> = vec![
(Keys::queue, "pending", false),
(Keys::active, "active", false),
(Keys::delayed, "delayed", true),
(Keys::retry, "retry", true),
(Keys::dead, "dead", false),
(Keys::priority_queue, "priority", true),
];
for (key_fn, status, is_sorted) in queue_checks {
let queue_key: RedisKey = key_fn(queue_name).into();
if !self.redis.exists(queue_key.clone()).await? {
continue;
}
let removed = if is_sorted {
self.redis.zrem(queue_key, task_id.into()).await?
} else {
self.redis.lrem(queue_key, task_id.into(), 1).await? > 0
};
if removed {
let task_key: RedisKey = Keys::task(task_id).into();
self.redis.del(vec![task_key.clone()]).await?;
let deps_key: RedisKey = Keys::pending_deps(task_id).into();
self.redis.del(vec![deps_key]).await?;
let task_deps_key: RedisKey = Keys::task_deps(task_id).into();
self.redis.del(vec![task_deps_key]).await?;
let progress_key: RedisKey = Keys::progress(task_id).into();
self.redis.del(vec![progress_key]).await?;
tracing::info!(
"Task '{}' cancelled from {} queue '{}'",
task_id,
status,
queue_name
);
return Ok(Some(status.to_string()));
}
}
tracing::debug!(
"Task '{}' not found in any queue for '{}'",
task_id,
queue_name
);
Ok(None)
}
pub async fn cancel_task_with_unique(
&self,
task_id: &str,
queue_name: &str,
unique_key: Option<&str>,
) -> Result<Option<String>> {
let result = self.cancel_task(task_id, queue_name).await?;
if result.is_some() {
if let Some(key) = unique_key {
let dedup_key: RedisKey = Keys::dedup(queue_name).into();
self.redis.srem(dedup_key, key.into()).await?;
}
}
Ok(result)
}
pub async fn retry_task(&self, task_id: &str, queue_name: &str) -> Result<bool> {
use crate::task::TaskStatus;
let dead_key: RedisKey = Keys::dead(queue_name).into();
let dead_tasks = self.redis.lrange(dead_key.clone(), 0, -1).await?;
let mut task_found = false;
let mut task_data: Option<Task> = None;
for task_id_str in dead_tasks {
if task_id_str == task_id {
task_found = true;
let task_key: RedisKey = Keys::task(task_id).into();
if let Some(data) = self.redis.hget(task_key.clone(), "data".into()).await? {
let bytes = data.as_bytes()
.ok_or_else(|| Error::Serialization("Task data is not bytes".into()))?;
task_data = Some(rmp_serde::from_slice(bytes)
.map_err(|e| Error::Serialization(e.to_string()))?);
}
break;
}
}
if !task_found {
tracing::warn!("Task '{}' not found in dead queue '{}'", task_id, queue_name);
return Ok(false);
}
let mut task = task_data.ok_or_else(|| Error::Validation("Failed to load task data".into()))?;
task.status = TaskStatus::Pending;
task.retry_cnt = 0;
task.last_error = None;
task.processed_at = None;
let task_key: RedisKey = Keys::task(task_id).into();
let new_data = rmp_serde::to_vec(&task)
.map_err(|e| Error::Serialization(e.to_string()))?;
self.redis.hset(
task_key.clone(),
vec![
("data".into(), RedisValue::Bytes(new_data.into())),
("queue".into(), RedisValue::Bytes(queue_name.to_string().into_bytes().into())),
],
).await?;
let task_ttl = config::get_task_ttl();
self.redis.expire(task_key, task_ttl).await?;
self.redis.lrem(dead_key, task_id.into(), 1).await?;
let queue_key: RedisKey = Keys::queue(queue_name).into();
self.redis.rpush(queue_key, task_id.into()).await?;
tracing::info!("Task '{}' re-queued for processing in queue '{}'", task_id, queue_name);
Ok(true)
}
}
#[derive(Debug, Default)]
pub struct ClientBuilder {
config: ClientConfig,
}
impl ClientBuilder {
#[must_use]
pub fn redis_url(mut self, url: impl Into<String>) -> Self {
self.config.redis_url = url.into();
self
}
#[must_use]
pub fn cluster_mode(mut self) -> Self {
self.config.redis_mode = RedisMode::Cluster;
self
}
#[must_use]
pub fn sentinel_mode(mut self) -> Self {
self.config.redis_mode = RedisMode::Sentinel;
self
}
#[must_use]
pub fn pool_size(mut self, size: usize) -> Self {
self.config.pool_config.pool_size = size;
self
}
#[must_use]
pub fn min_idle(mut self, min_idle: usize) -> Self {
self.config.pool_config.min_idle = Some(min_idle);
self
}
#[must_use]
pub fn connection_timeout(mut self, timeout: u64) -> Self {
self.config.pool_config.connection_timeout = Some(timeout);
self
}
#[must_use]
pub fn idle_timeout(mut self, timeout: u64) -> Self {
self.config.pool_config.idle_timeout = Some(timeout);
self
}
#[must_use]
pub fn max_lifetime(mut self, lifetime: u64) -> Self {
self.config.pool_config.max_lifetime = Some(lifetime);
self
}
#[must_use]
pub fn default_queue(mut self, queue: impl Into<String>) -> Self {
self.config.default_queue = queue.into();
self
}
pub async fn build(self) -> Result<Client> {
let redis = match self.config.redis_mode {
RedisMode::Standalone => RedisClient::from_url_with_pool_config(&self.config.redis_url, self.config.pool_config).await?,
RedisMode::Cluster => RedisClient::from_cluster_url_with_pool_config(&self.config.redis_url, self.config.pool_config).await?,
RedisMode::Sentinel => RedisClient::from_sentinel_url_with_pool_config(&self.config.redis_url, self.config.pool_config).await?,
};
Ok(Client {
redis,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_builder() {
let builder = Client::builder()
.redis_url("redis://localhost:6380")
.pool_size(20)
.min_idle(5)
.connection_timeout(30)
.idle_timeout(600)
.max_lifetime(1800)
.default_queue("custom");
assert_eq!(builder.config.redis_url, "redis://localhost:6380");
assert_eq!(builder.config.pool_config.pool_size, 20);
assert_eq!(builder.config.pool_config.min_idle, Some(5));
assert_eq!(builder.config.pool_config.connection_timeout, Some(30));
assert_eq!(builder.config.pool_config.idle_timeout, Some(600));
assert_eq!(builder.config.pool_config.max_lifetime, Some(1800));
assert_eq!(builder.config.default_queue, "custom");
}
#[test]
fn test_client_config_default() {
let config = ClientConfig::default();
assert_eq!(config.redis_url, "redis://localhost:6379");
assert_eq!(config.pool_config.pool_size, 10);
assert_eq!(config.default_queue, "default");
}
#[test]
fn test_pool_config() {
let pool_config = PoolConfig::new(20)
.min_idle(5)
.connection_timeout(30)
.idle_timeout(600)
.max_lifetime(1800);
assert_eq!(pool_config.pool_size, 20);
assert_eq!(pool_config.min_idle, Some(5));
assert_eq!(pool_config.connection_timeout, Some(30));
assert_eq!(pool_config.idle_timeout, Some(600));
assert_eq!(pool_config.max_lifetime, Some(1800));
}
}