use crate::backend::pagination::Pagination;
use crate::backend::rdb::redis::RedisConnectionType;
use crate::backend::rdb::RedisBroker;
use crate::base::keys::TaskState;
use crate::base::Broker;
use crate::error::Result;
use crate::proto::ServerInfo;
use crate::task::{DailyStats, TaskInfo};
use std::sync::Arc;
pub struct RedisInspector {
rdb: Arc<RedisBroker>,
}
impl RedisInspector {
pub async fn new(redis_connection_config: RedisConnectionType) -> Result<Self> {
let broker = RedisBroker::new(redis_connection_config).await?;
Ok(Self {
rdb: Arc::new(broker),
})
}
pub fn from_broker(broker: Arc<RedisBroker>) -> Self {
Self { rdb: broker }
}
pub async fn list_active_tasks(&self, queue: &str) -> Result<Vec<TaskInfo>> {
self
.rdb
.list_tasks(queue, TaskState::Active, Pagination::default())
.await
}
pub async fn list_pending_tasks(&self, queue: &str) -> Result<Vec<TaskInfo>> {
self
.rdb
.list_tasks(queue, TaskState::Pending, Pagination::default())
.await
}
pub async fn list_pending_tasks_with_pagination(
&self,
queue: &str,
pagination: Pagination,
) -> Result<Vec<TaskInfo>> {
if pagination.page < 1 || pagination.size <= 0 {
return Ok(vec![]);
}
self
.rdb
.list_tasks(queue, TaskState::Pending, pagination)
.await
}
pub async fn list_scheduled_tasks(&self, queue: &str) -> Result<Vec<TaskInfo>> {
self
.rdb
.list_tasks(queue, TaskState::Scheduled, Pagination::default())
.await
}
pub async fn list_retry_tasks(&self, queue: &str) -> Result<Vec<TaskInfo>> {
self
.rdb
.list_tasks(queue, TaskState::Retry, Pagination::default())
.await
}
pub async fn list_archived_tasks(&self, queue: &str) -> Result<Vec<TaskInfo>> {
self
.rdb
.list_tasks(queue, TaskState::Archived, Pagination::default())
.await
}
pub async fn list_completed_tasks(&self, queue: &str) -> Result<Vec<TaskInfo>> {
self
.rdb
.list_tasks(queue, TaskState::Completed, Pagination::default())
.await
}
pub async fn list_aggregating_tasks(&self, queue: &str) -> Result<Vec<TaskInfo>> {
self
.rdb
.list_tasks(queue, TaskState::Aggregating, Pagination::default())
.await
}
pub async fn run_all_archived_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.requeue_all_archived_tasks(queue).await
}
pub async fn run_all_retry_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.requeue_all_retry_tasks(queue).await
}
pub async fn run_all_scheduled_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.requeue_all_scheduled_tasks(queue).await
}
pub async fn history(&self, queue: &str, days: i32) -> Result<Vec<DailyStats>> {
self.rdb.get_history(queue, days).await
}
pub async fn delete_queue(&self, queue: &str) -> Result<()> {
let _ = self.rdb.delete_all_pending_tasks(queue).await?;
let _ = self.delete_all_active_tasks(queue).await;
let _ = self.rdb.delete_all_scheduled_tasks(queue).await?;
let _ = self.rdb.delete_all_retry_tasks(queue).await?;
let _ = self.rdb.delete_all_archived_tasks(queue).await?;
let _ = self.rdb.delete_expired_completed_tasks(queue).await?;
Ok(())
}
pub async fn delete_all_active_tasks(&self, _queue: &str) -> Result<i64> {
Err(crate::error::Error::NotImplemented(
"delete_all_active_tasks not implemented - active tasks are being processed".to_string(),
))
}
}
#[async_trait::async_trait]
impl crate::inspector::InspectorTrait for RedisInspector {
async fn get_queue_stats(&self, queue: &str) -> Result<crate::task::QueueStats> {
self.rdb.get_queue_stats(queue).await
}
async fn get_queue_info(&self, queue: &str) -> Result<crate::task::QueueInfo> {
self.rdb.get_queue_info(queue).await
}
async fn get_all_queue_stats(&self) -> Result<Vec<crate::task::QueueStats>> {
self.rdb.get_all_queue_stats().await
}
async fn get_queues(&self) -> Result<Vec<String>> {
self.rdb.get_queues().await
}
async fn get_groups(&self, queue: &str) -> Result<Vec<String>> {
self.rdb.list_groups(queue).await
}
async fn list_tasks(
&self,
queue: &str,
state: TaskState,
pagination: Pagination,
) -> Result<Vec<crate::task::TaskInfo>> {
self.rdb.list_tasks(queue, state, pagination).await
}
async fn get_task_info(&self, queue: &str, task_id: &str) -> Result<crate::task::TaskInfo> {
self.rdb.get_task_info(queue, task_id).await
}
async fn delete_task(&self, queue: &str, task_id: &str) -> Result<()> {
self.rdb.delete_task(queue, task_id).await
}
async fn delete_all_archived_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.delete_all_archived_tasks(queue).await
}
async fn delete_all_retry_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.delete_all_retry_tasks(queue).await
}
async fn delete_all_scheduled_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.delete_all_scheduled_tasks(queue).await
}
async fn delete_all_pending_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.delete_all_pending_tasks(queue).await
}
async fn requeue_all_archived_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.requeue_all_archived_tasks(queue).await
}
async fn requeue_all_retry_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.requeue_all_retry_tasks(queue).await
}
async fn requeue_all_scheduled_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.requeue_all_scheduled_tasks(queue).await
}
async fn run_task(&self, queue: &str, task_id: &str) -> Result<()> {
self.rdb.run_task(queue, task_id).await
}
async fn archive_task(&self, queue: &str, task_id: &str) -> Result<()> {
self.rdb.archive_task(queue, task_id).await
}
async fn archive_all_pending_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.archive_all_pending_tasks(queue).await
}
async fn archive_all_retry_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.archive_all_retry_tasks(queue).await
}
async fn archive_all_scheduled_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.archive_all_scheduled_tasks(queue).await
}
async fn archive_all_aggregating_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.archive_all_aggregating_tasks(queue).await
}
async fn pause_queue(&self, queue: &str) -> Result<()> {
self.rdb.pause_queue(queue).await
}
async fn unpause_queue(&self, queue: &str) -> Result<()> {
self.rdb.unpause_queue(queue).await
}
async fn is_queue_paused(&self, queue: &str) -> Result<bool> {
self.rdb.is_queue_paused(queue).await
}
async fn get_task_result(&self, queue: &str, task_id: &str) -> Result<Option<Vec<u8>>> {
self.rdb.get_result(queue, task_id).await
}
async fn delete_expired_completed_tasks(&self, queue: &str) -> Result<i64> {
self.rdb.delete_expired_completed_tasks(queue).await
}
async fn get_servers(&self) -> Result<Vec<ServerInfo>> {
self.rdb.get_servers().await
}
async fn get_server_info(&self, server_id: &str) -> Result<Option<ServerInfo>> {
self.rdb.get_server_info(server_id).await
}
async fn get_history(&self, queue: &str, days: i32) -> Result<Vec<DailyStats>> {
self.rdb.get_history(queue, days).await
}
async fn cancel_processing(&self, task_id: &str) -> Result<()> {
self.rdb.publish_cancellation(task_id).await
}
}