cool_task/
base.rs

1//! 队列基类
2//!
3//! 对齐 TS 版本的 `BaseCoolQueue`,提供一组易用的高阶 API:
4//! - `add` / `add_bulk`
5//! - `pause` / `resume` / `is_paused`
6//! - `obliterate`
7//!
8//! 使用方式示例:
9//!
10//! ```rust,ignore
11//! use cool_task::prelude::*;
12//!
13//! pub struct EmailQueue {
14//!     inner: BaseQueue,
15//! }
16//!
17//! impl EmailQueue {
18//!     pub async fn new(config: TaskConfig) -> JobResult<Self> {
19//!         let base = BaseQueue::new("email", config).await?;
20//!         Ok(Self { inner: base })
21//!     }
22//!
23//!     pub async fn send(&self, to: &str, subject: &str, body: &str) -> JobResult<Job> {
24//!         let data = serde_json::json!({ "to": to, "subject": subject, "body": body });
25//!         self.inner.add("send", data, JobOptions::default()).await
26//!     }
27//! }
28//! ```
29
30use crate::job::{Job, JobOptions, JobResult, JobStatus};
31use crate::queue::Queue;
32use crate::worker::{Worker, WorkerConfig};
33use crate::TaskConfig;
34use std::sync::Arc;
35
36/// 队列基类
37///
38/// 封装 `Queue` 和 `Worker`,提供更贴近 TS `BaseCoolQueue` 的使用体验。
39pub struct BaseQueue {
40    /// 队列名称
41    pub name: String,
42    /// 队列前缀
43    pub prefix: String,
44    /// 任务配置
45    pub task_config: TaskConfig,
46    /// 底层队列
47    pub queue: Arc<Queue>,
48    /// 可选 Worker
49    pub worker: Option<Worker>,
50}
51
52impl BaseQueue {
53    /// 创建新的队列基类实例,并根据配置自动创建 Queue/Worker
54    pub async fn new(name: &str, config: TaskConfig) -> JobResult<Self> {
55        let queue = Queue::new(name, &config.prefix, &config.redis_url).await?;
56        let worker_cfg = WorkerConfig {
57            concurrency: config.concurrency,
58            ..Default::default()
59        };
60        let worker = Worker::new(queue.clone(), worker_cfg);
61
62        Ok(Self {
63            name: name.to_string(),
64            prefix: config.prefix.clone(),
65            task_config: config,
66            queue: Arc::new(queue),
67            worker: Some(worker),
68        })
69    }
70
71    /// 只创建生产者(无 Worker)
72    pub async fn producer_only(name: &str, config: TaskConfig) -> JobResult<Self> {
73        let queue = Queue::new(name, &config.prefix, &config.redis_url).await?;
74        Ok(Self {
75            name: name.to_string(),
76            prefix: config.prefix.clone(),
77            task_config: config,
78            queue: Arc::new(queue),
79            worker: None,
80        })
81    }
82
83    /// 启动 Worker(如果存在)
84    pub async fn start_worker(&self) {
85        if let Some(worker) = &self.worker {
86            worker.start().await;
87        }
88    }
89
90    /// 停止 Worker(如果存在)
91    pub fn stop_worker(&self) {
92        if let Some(worker) = &self.worker {
93            worker.stop();
94        }
95    }
96
97    /// 发送单个任务
98    pub async fn add(
99        &self,
100        name: &str,
101        data: serde_json::Value,
102        options: JobOptions,
103    ) -> JobResult<Job> {
104        self.queue.add(name, data, options).await
105    }
106
107    /// 批量发送任务
108    pub async fn add_bulk(
109        &self,
110        jobs: Vec<(String, serde_json::Value, JobOptions)>,
111    ) -> JobResult<Vec<Job>> {
112        self.queue.add_bulk(jobs).await
113    }
114
115    /// 暂停队列
116    pub async fn pause(&self) -> JobResult<()> {
117        self.queue.pause().await
118    }
119
120    /// 恢复队列
121    pub async fn resume(&self) -> JobResult<()> {
122        self.queue.resume().await
123    }
124
125    /// 队列是否暂停
126    pub async fn is_paused(&self) -> JobResult<bool> {
127        self.queue.is_paused().await
128    }
129
130    /// 获取各状态任务数量
131    pub async fn count(&self, status: JobStatus) -> JobResult<u64> {
132        self.queue.count(status).await
133    }
134
135    /// 清空队列(包括任务数据)
136    pub async fn obliterate(&self) -> JobResult<()> {
137        self.queue.obliterate().await
138    }
139}