use crate::backend::pagination::Pagination;
use crate::backend::rdb::redis_scripts::RedisArg;
use crate::backend::rdb::RedisBroker;
use crate::base::keys::TaskState;
use crate::base::{keys, Broker};
use crate::error::{Error, Result};
use crate::proto::{ServerInfo, TaskMessage};
use crate::task::{DailyStats, QueueInfo, QueueStats, Task, TaskInfo};
use chrono::{DateTime, TimeZone, Utc};
use prost::Message;
use redis::AsyncCommands;
use std::str::FromStr;
use std::time::Duration;
fn extract_queue_name_from_key(key: &str) -> Option<String> {
if let Some(start) = key.find('{') {
if let Some(rel_end) = key[start + 1..].find('}') {
let end = start + 1 + rel_end;
return Some(key[start + 1..end].to_string());
}
}
None
}
impl RedisBroker {
pub async fn all_queues(&self) -> Result<Vec<String>> {
let mut conn = self.get_async_connection().await?;
let queue_names: Vec<String> = conn.smembers(keys::ALL_QUEUES).await?;
Ok(queue_names)
}
pub async fn current_stats(&self, queue: &str) -> Result<Vec<redis::Value>> {
let mut conn = self.get_async_connection().await?;
let pending_key = keys::pending_key(queue);
let active_key = keys::active_key(queue);
let scheduled_key = keys::scheduled_key(queue);
let retry_key = keys::retry_key(queue);
let archived_key = keys::archived_key(queue);
let completed_key = keys::completed_key(queue);
let now = Utc::now();
let date_str = now.format("%Y-%m-%d").to_string();
let processed_key = format!("asynq:{queue}:processed:{date_str}");
let failed_key = format!("asynq:{queue}:failed:{date_str}");
let processed_total_key = format!("asynq:{queue}:processed");
let failed_total_key = format!("asynq:{queue}:failed");
let paused_key = format!("asynq:{queue}:paused");
let groups_key = format!("asynq:{queue}:groups");
let task_key_prefix = format!("asynq:{queue}:t:");
let group_key_prefix = format!("asynq:{queue}:g:");
let keys = vec![
pending_key,
active_key,
scheduled_key,
retry_key,
archived_key,
completed_key,
processed_key,
failed_key,
processed_total_key,
failed_total_key,
paused_key,
groups_key,
];
let args = vec![
RedisArg::Str(task_key_prefix),
RedisArg::Str(group_key_prefix),
];
let result: Vec<redis::Value> = self
.script_manager
.eval_script(&mut conn, "current_stats", &keys, &args)
.await?;
Ok(result)
}
pub async fn memory_usage(&self, queue: &str, sample_size: i64) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let pending_key = keys::pending_key(queue);
let active_key = keys::active_key(queue);
let scheduled_key = keys::scheduled_key(queue);
let retry_key = keys::retry_key(queue);
let archived_key = keys::archived_key(queue);
let completed_key = keys::completed_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![
pending_key,
active_key,
scheduled_key,
retry_key,
archived_key,
completed_key,
];
let args = vec![RedisArg::Str(task_key_prefix), RedisArg::Int(sample_size)];
let memory: i64 = self
.script_manager
.eval_script(&mut conn, "memory_usage", &keys, &args)
.await?;
Ok(memory)
}
pub async fn historical_stats(
&self,
queue: &str,
dates: &[chrono::NaiveDate],
) -> Result<Vec<(i64, i64)>> {
let mut conn = self.get_async_connection().await?;
let mut keys = Vec::new();
for date in dates {
let naive_datetime = date.and_hms_opt(0, 0, 0).unwrap_or_default();
let dt = chrono::DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
let processed_key = keys::processed_key(queue, &dt);
let failed_key = keys::failed_key(queue, &dt);
keys.push(processed_key);
keys.push(failed_key);
}
let args: Vec<RedisArg> = vec![];
let raw_result: Vec<i64> = self
.script_manager
.eval_script(&mut conn, "historical_stats", &keys, &args)
.await?;
let mut result = Vec::new();
for chunk in raw_result.chunks(2) {
if let [processed, failed] = chunk {
result.push((*processed, *failed));
}
}
Ok(result)
}
pub async fn get_task_info(&self, queue: &str, task_id: &str) -> Result<TaskInfo> {
let mut conn = self.get_async_connection().await?;
if !self.queue_exists(queue).await? {
return Err(Error::other(format!("Queue '{queue}' does not exist")));
}
let now = Utc::now();
let keys = vec![keys::task_key(queue, task_id)];
let args = vec![
RedisArg::Str(task_id.to_string()),
RedisArg::Int(now.timestamp()),
RedisArg::Str(keys::queue_key_prefix(queue)),
];
let raw_result: Vec<redis::Value> = self
.script_manager
.eval_script(&mut conn, "get_task_info", &keys, &args)
.await?;
let [encoded, state, process_at, result]: [redis::Value; 4] =
raw_result.try_into().map_err(|_| {
Error::other("unexpected number of values returned from Lua script".to_string())
})?;
let encoded: Vec<u8> = redis::from_redis_value(encoded)
.map_err(|e| Error::other(format!("failed to parse encoded: {}", e)))?;
let state: String = redis::from_redis_value(state)
.map_err(|e| Error::other(format!("failed to parse state: {}", e)))?;
let process_at: i64 = redis::from_redis_value(process_at)
.map_err(|e| Error::other(format!("failed to parse process_at: {}", e)))?;
let result: String = redis::from_redis_value(result)
.map_err(|e| Error::other(format!("failed to parse result: {}", e)))?;
let task_msg = self.decode_task_message(&encoded)?;
let state = TaskState::from_str(&state)
.map_err(|_| Error::other("failed to parse TaskState".to_string()))?;
let result = if result.is_empty() {
None
} else {
Some(result.as_bytes().to_vec())
};
let process_at = if process_at == 0 {
None
} else {
DateTime::from_timestamp(process_at, 0)
};
Ok(TaskInfo::from_proto(&task_msg, state, process_at, result))
}
}
impl RedisBroker {
pub async fn group_stats(&self, queue: &str) -> Result<Vec<(String, i64)>> {
let mut conn = self.get_async_connection().await?;
let groups_key = keys::groups_key(queue);
let group_key_prefix = keys::group_key_prefix(queue);
let keys = vec![groups_key];
let args = vec![RedisArg::Str(group_key_prefix)];
let raw_result: Vec<redis::Value> = self
.script_manager
.eval_script(&mut conn, "group_stats", &keys, &args)
.await?;
let mut result = Vec::new();
for chunk in raw_result.chunks(2) {
if let [name_val, size_val] = chunk {
let name: String = redis::from_redis_value(name_val.clone())?;
let size: i64 = redis::from_redis_value(size_val.clone())?;
result.push((name, size));
}
}
Ok(result)
}
}
impl RedisBroker {
async fn queue_exists(&self, queue: &str) -> Result<bool> {
let mut conn = self.get_async_connection().await?;
let exists: bool = conn.sismember(keys::ALL_QUEUES, queue).await?;
Ok(exists)
}
async fn list_messages(
&self,
queue: &str,
state: TaskState,
pagination: Pagination,
) -> Result<Vec<TaskInfo>> {
let mut conn = self.get_async_connection().await?;
let key = state.queue_key(queue, None);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![key];
let args = vec![
RedisArg::Int(pagination.start()),
RedisArg::Int(pagination.stop()),
RedisArg::Str(task_key_prefix),
];
let raw_result: Vec<Vec<u8>> = self
.script_manager
.eval_script(&mut conn, "list_messages", &keys, &args)
.await?;
let mut result = Vec::new();
for chunk in raw_result.chunks(2) {
if let [msg, res] = chunk {
let task = self.decode_task_message(msg)?;
let task_result = if res.is_empty() {
None
} else {
Some(res.clone())
};
result.push(TaskInfo::from_proto(&task, state, None, task_result));
}
}
Ok(result)
}
async fn list_zset_entries(
&self,
queue: &str,
state: TaskState,
pagination: Pagination,
) -> Result<Vec<TaskInfo>> {
let mut conn = self.get_async_connection().await?;
let key = state.queue_key(queue, None);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![key];
let args = vec![
RedisArg::Int(pagination.start()),
RedisArg::Int(pagination.stop()),
RedisArg::Str(task_key_prefix),
];
let raw_result: Vec<redis::Value> = self
.script_manager
.eval_script(&mut conn, "list_zset_entries", &keys, &args)
.await?;
let mut result = Vec::new();
for chunk in raw_result.chunks(3) {
if let [msg_val, score_val, res_val] = chunk {
let msg: Vec<u8> = redis::from_redis_value(msg_val.clone())?;
let score: f64 = redis::from_redis_value(score_val.clone())?;
let next_process_at = if score > 0.0 {
Some(
Utc
.timestamp_opt(score as i64, 0)
.single()
.unwrap_or_default(),
)
} else {
None
};
let res_bytes: Vec<u8> = redis::from_redis_value(res_val.clone())?;
let res_opt = if res_bytes.is_empty() {
None
} else {
Some(res_bytes)
};
let task = self.decode_task_message(&msg)?;
result.push(TaskInfo::from_proto(&task, state, next_process_at, res_opt));
}
}
Ok(result)
}
}
impl RedisBroker {
pub async fn list_scheduler_keys(&self) -> Result<Vec<String>> {
let mut conn = self.get_async_connection().await?;
let now = Utc::now().timestamp();
let keys = vec![keys::ALL_SCHEDULERS.to_string()];
let args = vec![RedisArg::Int(now)];
let result: Vec<String> = self
.script_manager
.eval_script(&mut conn, "list_scheduler_keys", &keys, &args)
.await?;
Ok(result)
}
pub async fn list_tasks(
&self,
queue: &str,
state: TaskState,
pagination: Pagination,
) -> Result<Vec<TaskInfo>> {
if !self.queue_exists(queue).await? {
return Err(Error::other(format!("Queue '{queue}' does not exist")));
}
if pagination.page < 0 || pagination.size < 1 {
return Ok(Vec::new());
}
let task_info_list: Vec<TaskInfo> = if matches!(state, TaskState::Pending | TaskState::Active) {
self.list_messages(queue, state, pagination).await?
} else {
self.list_zset_entries(queue, state, pagination).await?
};
Ok(task_info_list)
}
}
impl RedisBroker {
pub async fn list_task_ids(&self, queue: &str, state: &TaskState) -> Result<Vec<String>> {
let key = format!("asynq:{queue}:{state}"); let mut conn = self.get_async_connection().await?;
let ids: Vec<String> = conn.zrange(&key, 0, -1).await?;
Ok(ids)
}
pub async fn list_server_ids(&self) -> Result<Vec<String>> {
let mut conn = self.get_async_connection().await?;
let ids: Vec<String> = conn.smembers(keys::ALL_SERVERS).await?;
Ok(ids)
}
pub async fn get_task(
&self,
queue: &str,
state: TaskState,
task_id: &str,
) -> Result<Option<TaskMessage>> {
let key = format!("asynq:{queue}:{state}"); let mut conn = self.get_async_connection().await?;
let value: Option<Vec<u8>> = conn.zscore(&key, task_id).await?;
if let Some(bytes) = value {
let msg = TaskMessage::decode(&*bytes)?;
Ok(Some(msg))
} else {
Ok(None)
}
}
pub async fn get_queue_state_counts(
&self,
queue: &str,
) -> Result<std::collections::HashMap<String, i64>> {
let mut conn = self.get_async_connection().await?;
let mut counts = std::collections::HashMap::new();
for state in [
"pending",
"active",
"scheduled",
"retry",
"archived",
"completed",
] {
let key = format!("asynq:{queue}:{state}"); let count: i64 = conn.zcard(&key).await?;
counts.insert(state.to_string(), count);
}
Ok(counts)
}
pub async fn list_server_keys(&self) -> Result<Vec<String>> {
let mut conn = self.get_async_connection().await?;
let now = Utc::now().timestamp();
let keys = vec![keys::ALL_SERVERS.to_string()];
let args = vec![RedisArg::Int(now)];
let result: Vec<String> = self
.script_manager
.eval_script(&mut conn, "list_server_keys", &keys, &args)
.await?;
Ok(result)
}
pub async fn list_workers(&self, host: &str, pid: i32, server_id: &str) -> Result<Vec<String>> {
let mut conn = self.get_async_connection().await?;
let workers_key = keys::workers_key(host, pid, server_id);
let now = Utc::now().timestamp();
let keys = vec![workers_key];
let args = vec![RedisArg::Int(now)];
let result: Vec<String> = self
.script_manager
.eval_script(&mut conn, "list_workers", &keys, &args)
.await?;
Ok(result)
}
pub async fn delete_task(&self, queue: &str, task_id: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let task_key = keys::task_key(queue, task_id);
let groups_key = keys::groups_key(queue);
let queue_key_prefix = format!("asynq:{queue}:");
let group_key_prefix = keys::group_key_prefix(queue);
let keys = vec![task_key, groups_key];
let args = vec![
RedisArg::Str(task_id.to_string()),
RedisArg::Str(queue_key_prefix),
RedisArg::Str(group_key_prefix),
];
let result: i64 = self
.script_manager
.eval_script(&mut conn, "delete_task", &keys, &args)
.await?;
match result {
1 => Ok(()),
0 => Err(Error::other("Task not found")),
-1 => Err(Error::other("Cannot delete active task")),
_ => Err(Error::other("Unexpected script result")),
}
}
pub async fn delete_all_archived_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let archived_key = keys::archived_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![archived_key];
let args = vec![RedisArg::Str(task_key_prefix)];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "delete_all", &keys, &args)
.await?;
Ok(count)
}
pub async fn delete_all_retry_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let retry_key = keys::retry_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![retry_key];
let args = vec![RedisArg::Str(task_key_prefix)];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "delete_all", &keys, &args)
.await?;
Ok(count)
}
pub async fn delete_all_scheduled_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let scheduled_key = keys::scheduled_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![scheduled_key];
let args = vec![RedisArg::Str(task_key_prefix)];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "delete_all", &keys, &args)
.await?;
Ok(count)
}
pub async fn delete_all_pending_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let pending_key = keys::pending_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![pending_key];
let args = vec![RedisArg::Str(task_key_prefix)];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "delete_all_pending", &keys, &args)
.await?;
Ok(count)
}
pub async fn delete_all_completed_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let key = keys::completed_key(queue);
let count: i64 = conn.zcard(&key).await?;
conn.del::<_, ()>(&key).await?;
Ok(count)
}
pub async fn requeue_all_archived_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let archived_key = keys::archived_key(queue);
let pending_key = keys::pending_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![archived_key, pending_key];
let args = vec![RedisArg::Str(task_key_prefix)];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "run_all", &keys, &args)
.await?;
Ok(count)
}
pub async fn requeue_all_retry_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let retry_key = keys::retry_key(queue);
let pending_key = keys::pending_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![retry_key, pending_key];
let args = vec![RedisArg::Str(task_key_prefix)];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "run_all", &keys, &args)
.await?;
Ok(count)
}
pub async fn requeue_all_scheduled_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let scheduled_key = keys::scheduled_key(queue);
let pending_key = keys::pending_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![scheduled_key, pending_key];
let args = vec![RedisArg::Str(task_key_prefix)];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "run_all", &keys, &args)
.await?;
Ok(count)
}
pub async fn get_queue_stats(&self, queue: &str) -> Result<QueueStats> {
let mut conn = self.get_async_connection().await?;
let pending: i64 = conn.llen(keys::pending_key(queue)).await?;
let active: i64 = conn.llen(keys::active_key(queue)).await?;
let scheduled: i64 = conn.zcard(keys::scheduled_key(queue)).await?;
let retry: i64 = conn.zcard(keys::retry_key(queue)).await?;
let archived: i64 = conn.zcard(keys::archived_key(queue)).await?;
let completed: i64 = conn.zcard(keys::completed_key(queue)).await?;
let aggregating_pattern = format!("{}{}:*", keys::AGGREGATING_PREFIX, queue);
let aggregating_keys: Vec<String> = conn.keys(&aggregating_pattern).await?;
let mut aggregating = 0i64;
for key in aggregating_keys {
let count: i64 = conn.llen(&key).await?;
aggregating += count;
}
let daily_stats = Vec::new();
Ok(crate::task::QueueStats::new(
queue.to_string(),
active,
pending,
scheduled,
retry,
archived,
completed,
aggregating,
daily_stats,
))
}
pub async fn get_queue_info(&self, queue: &str) -> Result<QueueInfo> {
let stats = self.get_queue_stats(queue).await?;
let size = stats.active
+ stats.pending
+ stats.scheduled
+ stats.retry
+ stats.archived
+ stats.completed
+ stats.aggregating;
let memory_usage = size * 1024;
let latency = Duration::from_secs(0);
let groups = self.list_groups(queue).await?.len() as i32;
Ok(QueueInfo {
queue: queue.to_string(),
memory_usage,
latency,
size: size as i32,
groups,
pending: stats.pending as i32,
active: stats.active as i32,
scheduled: stats.scheduled as i32,
retry: stats.retry as i32,
archived: stats.archived as i32,
completed: stats.completed as i32,
aggregating: stats.aggregating as i32,
processed: 0, failed: 0, processed_total: 0, failed_total: 0, paused: self.is_queue_paused(queue).await?,
timestamp: Utc::now(),
})
}
pub async fn get_all_queue_stats(&self) -> Result<Vec<QueueStats>> {
let mut conn = self.get_async_connection().await?;
let queue_pattern = format!("{}{}*", keys::QUEUE_PREFIX, keys::QUEUE_START);
let queue_keys: Vec<String> = conn.keys(&queue_pattern).await?;
let mut all_stats = Vec::new();
for queue_key in queue_keys {
if let Some(base_name) = extract_queue_name_from_key(&queue_key) {
let stats = self.get_queue_stats(&base_name).await?;
all_stats.push(stats);
}
}
Ok(all_stats)
}
pub async fn get_queues(&self) -> Result<Vec<String>> {
let mut conn = self.get_async_connection().await?;
let queue_pattern = format!("{}{}*", keys::QUEUE_PREFIX, keys::QUEUE_START);
let queue_keys: Vec<String> = conn.keys(&queue_pattern).await?;
let mut queues = std::collections::HashSet::new();
for key in queue_keys {
if let Some(base_name) = extract_queue_name_from_key(&key) {
if !base_name.is_empty() {
queues.insert(base_name);
}
}
}
Ok(queues.into_iter().collect())
}
pub async fn pause_queue(&self, queue: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let key = keys::paused_key(queue);
let now = Utc::now().timestamp();
let _: i32 = conn.set_nx(key, now).await?;
Ok(())
}
pub async fn unpause_queue(&self, queue: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let key = keys::paused_key(queue);
let _: i32 = conn.del(key).await?;
Ok(())
}
pub async fn is_queue_paused(&self, queue: &str) -> Result<bool> {
let mut conn = self.get_async_connection().await?;
let key = keys::paused_key(queue);
let exists: bool = conn.exists(key).await?;
Ok(exists)
}
pub async fn get_result(&self, queue: &str, task_id: &str) -> Result<Option<Vec<u8>>> {
let mut conn = self.get_async_connection().await?;
let result_key = keys::task_key(queue, task_id);
let result: Option<Vec<u8>> = conn.hget(&result_key, "result").await?;
Ok(result)
}
pub async fn check_rate_limit(&self, task: &Task) -> Result<bool> {
let rate_limit = match &task.options.rate_limit {
Some(limit) => limit,
None => return Ok(true),
};
let mut conn = self.get_async_connection().await?;
let rate_key = rate_limit.generate_key(&task.task_type, &task.options.queue);
let window_seconds = rate_limit.window.as_secs();
let limit_count = rate_limit.limit;
let now = Utc::now().timestamp();
let result: Vec<i64> = {
let keys = vec![rate_key];
let args = vec![
RedisArg::Int(window_seconds as i64),
RedisArg::Int(limit_count as i64),
RedisArg::Int(now),
];
self
.script_manager
.eval_script(&mut conn, "rate_limit", &keys, &args)
.await?
};
Ok(result.first().unwrap_or(&0) == &1)
}
pub async fn recover_orphaned_tasks(&self, queues: &[String]) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let mut recovered = 0i64;
for queue in queues {
let active_key = keys::active_key(queue);
let queue_key = keys::pending_key(queue);
let active_tasks: Vec<Vec<u8>> = conn.smembers(&active_key).await?;
if !active_tasks.is_empty() {
let mut pipe = redis::pipe();
pipe.atomic();
for task_data in &active_tasks {
pipe.srem(&active_key, task_data);
pipe.lpush(&queue_key, task_data);
}
let _: Vec<i32> = pipe.query_async(&mut conn).await?;
recovered += active_tasks.len() as i64;
}
}
Ok(recovered)
}
pub async fn aggregate_group(
&self,
queue: &str,
group: &str,
max_size: usize,
) -> Result<Option<Vec<TaskMessage>>> {
let mut conn = self.get_async_connection().await?;
let aggregating_key = keys::aggregating_key(queue, group);
let task_data_list: Vec<Vec<u8>> = conn
.lrange(&aggregating_key, 0, max_size as isize - 1)
.await?;
if task_data_list.is_empty() {
return Ok(None);
}
let mut tasks = Vec::new();
let mut pipe = redis::pipe();
pipe.atomic();
for task_data in &task_data_list {
if let Ok(msg) = self.decode_task_message(task_data) {
tasks.push(msg);
pipe.lrem(&aggregating_key, 1, task_data);
}
}
let remaining_count: i64 = conn.llen(&aggregating_key).await?;
if remaining_count <= task_data_list.len() as i64 {
pipe.del(&aggregating_key);
}
let _: Vec<i32> = pipe.query_async(&mut conn).await?;
if tasks.is_empty() {
Ok(None)
} else {
Ok(Some(tasks))
}
}
pub async fn unregister_server(&self, server_id: &str) -> Result<()> {
let parts: Vec<&str> = server_id.split(':').collect();
if parts.len() != 3 {
return Err(Error::other(format!(
"Invalid server_id format: {server_id}, expected hostname:pid:uuid"
)));
}
let hostname = parts[0];
let pid: i32 = parts[1]
.parse()
.map_err(|_| Error::other(format!("Invalid pid in server_id: {}", parts[1])))?;
let uuid = parts[2];
self.clear_server_state(hostname, pid, uuid, None).await
}
pub async fn heartbeat(&self, server_id: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let server_key = keys::server_info_key_legacy(server_id);
let timestamp = Utc::now().timestamp();
let _: () = conn.hset(&server_key, "last_heartbeat", timestamp).await?;
let _: () = conn.expire(&server_key, 3600).await?;
Ok(())
}
pub async fn get_servers(&self) -> Result<Vec<ServerInfo>> {
let mut conn = self.get_async_connection().await?;
let server_ids: Vec<String> = conn.zrange(keys::ALL_SERVERS, 0, -1).await?;
let mut servers = Vec::new();
for server_id in server_ids {
if let Ok(Some(server_info)) = self.get_server_info(&server_id).await {
servers.push(server_info);
}
}
Ok(servers)
}
pub async fn get_server_info(&self, server_key: &str) -> Result<Option<ServerInfo>> {
let mut conn = self.get_async_connection().await?;
let server_data: Option<Vec<u8>> = conn.get(server_key).await?;
if let Some(data) = server_data {
match ServerInfo::decode(&data[..]) {
Ok(server_info) => Ok(Some(server_info)),
Err(_err) => Ok(None),
}
} else {
Ok(None)
}
}
pub async fn get_history(&self, queue: &str, days: i32) -> Result<Vec<DailyStats>> {
let mut conn = self.get_async_connection().await?;
let mut history = Vec::new();
for i in 0..days {
let date = Utc::now() - chrono::Duration::days(i as i64);
let processed_key = keys::processed_key(queue, &date);
let failed_key = keys::failed_key(queue, &date);
let processed: i64 = conn.get(&processed_key).await.unwrap_or(0);
let failed: i64 = conn.get(&failed_key).await.unwrap_or(0);
history.push(DailyStats {
queue: queue.to_string(),
processed,
failed,
date,
});
}
Ok(history)
}
pub async fn cleanup_dead_servers(&self, timeout: Duration) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let _cutoff_time = Utc::now().timestamp() - timeout.as_secs() as i64;
let server_ids: Vec<String> = conn.zrange(keys::ALL_SERVERS, 0, -1).await?;
let mut cleaned_up = 0i64;
for server_id in server_ids {
let parts: Vec<&str> = server_id.split(':').collect();
if parts.len() != 3 {
continue; }
let hostname = parts[0];
let pid: i32 = match parts[1].parse() {
Ok(p) => p,
Err(_) => continue, };
let uuid = parts[2];
let server_key = keys::server_info_key(hostname, pid, uuid);
let workers_key = keys::workers_key(hostname, pid, uuid);
let exists: bool = conn.exists(&server_key).await?;
if !exists {
let _: i32 = conn.zrem(keys::ALL_SERVERS, &server_id).await?;
let _: i32 = conn.zrem(keys::ALL_WORKERS, &workers_key).await?;
cleaned_up += 1;
}
}
Ok(cleaned_up)
}
pub async fn archive_task(&self, queue: &str, task_id: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let task_info = self.get_task_info(queue, task_id).await?;
let task_key = keys::task_key(queue, task_id);
let archived_key = keys::archived_key(queue);
let current_time = Utc::now().timestamp();
let source_key = match task_info.state {
TaskState::Pending => keys::pending_key(queue),
TaskState::Scheduled => keys::scheduled_key(queue),
TaskState::Retry => keys::retry_key(queue),
TaskState::Active => keys::active_key(queue),
TaskState::Aggregating => {
if let Some(group) = &task_info.group {
keys::group_key(queue, group)
} else {
return Err(Error::other("Aggregating task must have group"));
}
}
TaskState::Archived => return Ok(()), TaskState::Completed => return Err(Error::other("Cannot archive completed task")),
};
let keys = if task_info.state == TaskState::Active {
vec![task_key, source_key, archived_key, keys::lease_key(queue)]
} else {
vec![task_key, source_key, archived_key]
};
let args = vec![
RedisArg::Str(task_id.to_string()),
RedisArg::Int(current_time),
];
let result: i64 = self
.script_manager
.eval_script(&mut conn, "archive_task", &keys, &args)
.await?;
if result == 0 {
return Err(Error::other(
"Failed to archive task - task may not exist in expected state",
));
}
Ok(())
}
pub async fn archive_all_pending_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let pending_key = keys::pending_key(queue);
let archived_key = keys::archived_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let current_time = Utc::now().timestamp();
let keys = vec![pending_key, archived_key];
let args = vec![RedisArg::Str(task_key_prefix), RedisArg::Int(current_time)];
let result: i64 = self
.script_manager
.eval_script(&mut conn, "archive_all_pending", &keys, &args)
.await?;
Ok(result)
}
pub async fn archive_all_retry_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let retry_key = keys::retry_key(queue);
let archived_key = keys::archived_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let current_time = Utc::now().timestamp();
let keys = vec![retry_key, archived_key];
let args = vec![RedisArg::Str(task_key_prefix), RedisArg::Int(current_time)];
let result: i64 = self
.script_manager
.eval_script(&mut conn, "archive_all", &keys, &args)
.await?;
Ok(result)
}
pub async fn archive_all_scheduled_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let scheduled_key = keys::scheduled_key(queue);
let archived_key = keys::archived_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let current_time = Utc::now().timestamp();
let keys = vec![scheduled_key, archived_key];
let args = vec![RedisArg::Str(task_key_prefix), RedisArg::Int(current_time)];
let result: i64 = self
.script_manager
.eval_script(&mut conn, "archive_all", &keys, &args)
.await?;
Ok(result)
}
pub async fn archive_all_aggregating_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let groups = self.list_groups(queue).await?;
let mut total_archived = 0;
for group in groups {
let group_key = keys::group_key(queue, &group);
let archived_key = keys::archived_key(queue);
let all_aggregation_sets_key = keys::all_aggregation_sets(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let current_time = Utc::now().timestamp();
let keys = vec![group_key, archived_key, all_aggregation_sets_key];
let args = vec![
RedisArg::Str(task_key_prefix),
RedisArg::Int(current_time),
RedisArg::Str(group.clone()),
];
let result: i64 = self
.script_manager
.eval_script(&mut conn, "archive_all_aggregating", &keys, &args)
.await?;
total_archived += result;
}
Ok(total_archived)
}
pub async fn run_task(&self, queue: &str, task_id: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let keys = vec![
keys::task_key(queue, task_id),
keys::pending_key(queue),
keys::all_groups(queue),
];
let args = vec![
RedisArg::Str(task_id.to_string()),
RedisArg::Str(keys::queue_key_prefix(queue)),
RedisArg::Str(keys::group_key_prefix(queue)),
];
let result: i64 = self
.script_manager
.eval_script(&mut conn, "run_task", &keys, &args)
.await?;
match result {
1 => Ok(()),
0 => Err(Error::other("Task not found")),
-1 => Err(Error::other("Task is in active state")),
-2 => Err(Error::other("Task is already in pending state")),
_ => Err(Error::other("Unexpected script result")),
}
}
pub async fn remove_queue(&self, queue: &str, force: bool) -> Result<()> {
let mut conn = self.get_async_connection().await?;
if !force {
let active_key = keys::active_key(queue);
let active_count: i64 = conn.hlen(&active_key).await?;
if active_count > 0 {
return Err(Error::other(
"Queue has active tasks. Use force=true to remove anyway.",
));
}
}
let keys_to_remove = vec![
keys::pending_key(queue),
keys::active_key(queue),
keys::scheduled_key(queue),
keys::retry_key(queue),
keys::archived_key(queue),
keys::completed_key(queue),
keys::paused_key(queue),
keys::lease_key(queue),
];
for key in keys_to_remove {
conn.del::<_, ()>(&key).await?;
}
Ok(())
}
pub async fn get_aggregation_sets(&self, queue: &str, group: &str) -> Result<Vec<String>> {
let mut conn = self.get_async_connection().await?;
let key = keys::all_aggregation_sets(queue);
let _pattern = format!("{group}:*");
let all_sets: Vec<String> = conn.smembers(&key).await?;
let filtered_sets: Vec<String> = all_sets
.into_iter()
.filter(|s| s.starts_with(&format!("{group}:")))
.collect();
Ok(filtered_sets)
}
pub async fn delete_all_aggregating_tasks(&self, queue: &str, group: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let group_key = keys::group_key(queue, group);
let groups_key = keys::groups_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![group_key, groups_key];
let args = vec![
RedisArg::Str(task_key_prefix),
RedisArg::Str(group.to_string()),
];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "delete_all_aggregating", &keys, &args)
.await?;
Ok(count)
}
pub async fn run_all_aggregating_tasks(&self, queue: &str, group: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let group_key = keys::group_key(queue, group);
let pending_key = keys::pending_key(queue);
let groups_key = keys::groups_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![group_key, pending_key, groups_key];
let args = vec![
RedisArg::Str(task_key_prefix),
RedisArg::Str(group.to_string()),
];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "run_all_aggregating", &keys, &args)
.await?;
Ok(count)
}
pub async fn requeue_active_task(&self, queue: &str, task_id: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let active_key = keys::active_key(queue);
let lease_key = keys::lease_key(queue);
let pending_key = keys::pending_key(queue);
let task_key = keys::task_key(queue, task_id);
let keys = vec![active_key, lease_key, pending_key, task_key];
let args = vec![RedisArg::Str(task_id.to_string())];
self
.script_manager
.eval_script::<()>(&mut conn, "requeue", &keys, &args)
.await?;
Ok(())
}
pub async fn del_aggregation_set(&self, queue: &str, group: &str, set_id: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let aggregation_set_key = format!("asynq:{queue}:g:{group}:{set_id}");
let all_aggregation_sets_key = keys::all_aggregation_sets(queue);
let task_key_prefix = format!("asynq:{queue}:t:");
let keys = vec![aggregation_set_key, all_aggregation_sets_key];
let args = vec![RedisArg::Str(task_key_prefix)];
self
.script_manager
.eval_script::<()>(&mut conn, "delete_aggregation_set", &keys, &args)
.await?;
Ok(())
}
}