pub mod builder;
use crate::{
storage::{Keys, RedisClient},
task::{TaskStatus, Task},
progress::TaskProgress,
Error, Result,
};
use fred::prelude::RedisKey;
use rmp_serde;
use serde::Deserialize;
pub use builder::{Client, ClientBuilder};
pub struct Inspector {
redis: RedisClient,
}
impl Inspector {
pub fn new(redis: RedisClient) -> Self {
Self { redis }
}
pub async fn get_task(&self, task_id: &str) -> Result<TaskInfo> {
let key: RedisKey = Keys::task(task_id).into();
let data = self.redis.hget(key.clone(), "data".into()).await?
.ok_or(Error::TaskNotFound(task_id.to_string()))?;
let bytes = data.as_bytes().ok_or_else(|| {
Error::Serialization("Task data is not bytes".to_string())
})?;
let task: Task = rmp_serde::from_slice(bytes)
.map_err(|e| Error::Serialization(e.to_string()))?;
let progress = self.get_task_progress(task_id).await.ok().flatten();
Ok(TaskInfo {
id: task.id,
task_type: task.task_type,
queue: task.queue,
status: task.status,
retry_cnt: task.retry_cnt,
last_error: task.last_error,
created_at: task.created_at,
enqueued_at: task.enqueued_at,
processed_at: task.processed_at,
progress,
})
}
pub async fn queue_stats(&self, queue_name: &str) -> Result<QueueStats> {
let queue_key: RedisKey = Keys::queue(queue_name).into();
let active_key: RedisKey = Keys::active(queue_name).into();
let delayed_key: RedisKey = Keys::delayed(queue_name).into();
let retry_key: RedisKey = Keys::retry(queue_name).into();
let dead_key: RedisKey = Keys::dead(queue_name).into();
let pending = self.redis.llen(queue_key).await?;
let active = self.redis.llen(active_key).await?;
let delayed = self.redis.zrangebyscore(delayed_key, 0, i64::MAX).await?.len() as u64;
let retry = self.redis.zrangebyscore(retry_key, 0, i64::MAX).await?.len() as u64;
let dead = self.redis.llen(dead_key).await?;
let stats_key: RedisKey = Keys::stats(queue_name).into();
let field_key: RedisKey = "processed".into();
let completed = match self.redis.hget(stats_key, field_key).await {
Ok(Some(value)) => value.as_string().and_then(|s| s.parse().ok()).unwrap_or(0),
_ => 0,
};
Ok(QueueStats {
name: queue_name.to_string(),
pending,
active,
delayed: delayed as u64,
retried: retry as u64,
dead,
completed,
})
}
pub async fn list_queues(&self) -> Result<Vec<String>> {
let key: RedisKey = Keys::meta_queues().into();
let queues = self.redis.smembers(key).await?;
Ok(queues)
}
pub async fn list_tasks(&self, queue_name: &str, limit: usize) -> Result<Vec<TaskInfo>> {
let queue_key: RedisKey = Keys::queue(queue_name).into();
let queue_len = self.redis.llen(queue_key.clone()).await?;
let actual_limit = queue_len.min(limit as u64) as usize;
if actual_limit == 0 {
return Ok(Vec::new());
}
let task_ids = self.redis.lrange(queue_key, 0, actual_limit as i64 - 1).await?;
let mut tasks = Vec::new();
for task_id in task_ids {
if let Ok(task_info) = self.get_task(&task_id).await {
tasks.push(task_info);
}
}
Ok(tasks)
}
pub async fn list_workers(&self) -> Result<Vec<WorkerInfo>> {
let key: RedisKey = Keys::meta_workers().into();
let members = self.redis.smembers(key).await?;
let mut workers = Vec::new();
for worker_id in members {
if let Ok(info) = self.get_worker(&worker_id).await {
workers.push(info);
}
}
Ok(workers)
}
pub async fn get_worker(&self, worker_id: &str) -> Result<WorkerInfo> {
let worker_key: RedisKey = Keys::meta_worker(worker_id).into();
let data = self.redis.get(worker_key).await?;
if let Some(data) = data {
let bytes = data.as_bytes()
.ok_or_else(|| Error::Serialization("Worker data is not bytes".into()))?;
let metadata: crate::server::worker::WorkerMetadata = rmp_serde::from_slice(bytes)
.map_err(|e| Error::Serialization(e.to_string()))?;
Ok(WorkerInfo {
id: metadata.id,
server_name: metadata.server_name,
queues: metadata.queues,
started_at: metadata.started_at,
last_heartbeat: metadata.last_heartbeat,
processed_total: metadata.processed_total,
status: metadata.status,
})
} else {
Err(Error::TaskNotFound(format!("Worker {} not found", worker_id)))
}
}
pub async fn stop_worker(&self, worker_id: &str) -> Result<bool> {
let heartbeat_key: RedisKey = Keys::meta_heartbeat(worker_id).into();
let exists = self.redis.exists(heartbeat_key.clone()).await?;
if exists {
self.redis.del(vec![heartbeat_key]).await?;
Ok(true)
} else {
Ok(false)
}
}
pub async fn get_task_progress(&self, task_id: &str) -> Result<Option<TaskProgress>> {
let key: RedisKey = Keys::progress(task_id).into();
match self.redis.hgetall(key).await {
Ok(values) if !values.is_empty() => {
let mut current = 0u32;
let mut message: Option<String> = None;
let mut updated_at = 0i64;
let mut iter = values.iter().peekable();
while let Some(field) = iter.next() {
if let Some(value) = iter.next() {
if let Some(field_name) = field.as_str() {
match field_name.as_ref() {
"current" => {
if let Some(val_str) = value.as_str() {
current = val_str.parse().unwrap_or(0);
}
}
"message" => {
message = value.as_string();
}
"updated_at" => {
if let Some(val_str) = value.as_str() {
updated_at = val_str.parse().unwrap_or(0);
}
}
_ => {}
}
}
}
}
Ok(Some(TaskProgress {
current,
message,
updated_at,
}))
}
_ => Ok(None),
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct TaskInfo {
pub id: String,
pub task_type: String,
pub queue: String,
pub status: TaskStatus,
pub retry_cnt: u32,
pub last_error: Option<String>,
pub created_at: i64,
pub enqueued_at: Option<i64>,
pub processed_at: Option<i64>,
#[serde(default)]
pub progress: Option<TaskProgress>,
}
#[derive(Debug, Clone)]
pub struct QueueStats {
pub name: String,
pub pending: u64,
pub active: u64,
pub delayed: u64,
pub retried: u64,
pub dead: u64,
pub completed: u64,
}
#[derive(Debug, Clone)]
pub struct WorkerInfo {
pub id: String,
pub server_name: String,
pub queues: Vec<String>,
pub started_at: i64,
pub last_heartbeat: i64,
pub processed_total: u64,
pub status: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_queue_stats_default() {
let stats = QueueStats {
name: "default".to_string(),
pending: 10,
active: 2,
delayed: 5,
retried: 1,
dead: 0,
completed: 100,
};
assert_eq!(stats.name, "default");
assert_eq!(stats.pending, 10);
}
}