cool_task/
queue.rs

1//! 队列实现
2
3use crate::job::{Job, JobOptions, JobResult, JobStatus};
4use redis::aio::MultiplexedConnection;
5use redis::AsyncCommands;
6
7/// 队列
8#[derive(Clone)]
9pub struct Queue {
10    /// 队列名称
11    name: String,
12    /// 前缀
13    prefix: String,
14    /// Redis 连接
15    conn: MultiplexedConnection,
16}
17
18impl Queue {
19    /// 创建队列
20    pub async fn new(name: &str, prefix: &str, redis_url: &str) -> JobResult<Self> {
21        let client = redis::Client::open(redis_url)?;
22        let conn = client.get_multiplexed_async_connection().await?;
23        Ok(Self {
24            name: name.to_string(),
25            prefix: prefix.to_string(),
26            conn,
27        })
28    }
29
30    /// 获取 Redis key
31    fn key(&self, suffix: &str) -> String {
32        format!("{}:{}:{}", self.prefix, self.name, suffix)
33    }
34
35    /// 添加任务
36    pub async fn add(
37        &self,
38        name: &str,
39        data: serde_json::Value,
40        options: JobOptions,
41    ) -> JobResult<Job> {
42        let job = Job::new(&self.name, name, data, options);
43        let job_json = serde_json::to_string(&job)?;
44
45        let mut conn = self.conn.clone();
46
47        // 存储任务数据
48        let job_key = self.key(&format!("job:{}", job.id));
49        conn.set::<_, _, ()>(&job_key, &job_json).await?;
50
51        // 根据是否延迟决定放入哪个队列
52        if let Some(delay) = job.options.delay {
53            let score = chrono::Utc::now().timestamp_millis() + delay as i64;
54            conn.zadd::<_, _, _, ()>(self.key("delayed"), &job.id, score)
55                .await?;
56        } else {
57            // 按优先级放入等待队列
58            let score = -job.options.priority as f64;
59            conn.zadd::<_, _, _, ()>(self.key("waiting"), &job.id, score)
60                .await?;
61        }
62
63        Ok(job)
64    }
65
66    /// 批量添加任务
67    pub async fn add_bulk(
68        &self,
69        jobs: Vec<(String, serde_json::Value, JobOptions)>,
70    ) -> JobResult<Vec<Job>> {
71        let mut results = Vec::new();
72        for (name, data, options) in jobs {
73            let job = self.add(&name, data, options).await?;
74            results.push(job);
75        }
76        Ok(results)
77    }
78
79    /// 获取任务
80    pub async fn get_job(&self, job_id: &str) -> JobResult<Option<Job>> {
81        let mut conn = self.conn.clone();
82        let job_key = self.key(&format!("job:{}", job_id));
83        let job_json: Option<String> = conn.get(&job_key).await?;
84
85        match job_json {
86            Some(json) => {
87                let job: Job = serde_json::from_str(&json)?;
88                Ok(Some(job))
89            }
90            None => Ok(None),
91        }
92    }
93
94    /// 更新任务
95    pub async fn update_job(&self, job: &Job) -> JobResult<()> {
96        let mut conn = self.conn.clone();
97        let job_key = self.key(&format!("job:{}", job.id));
98        let job_json = serde_json::to_string(job)?;
99        conn.set::<_, _, ()>(&job_key, &job_json).await?;
100        Ok(())
101    }
102
103    /// 获取下一个待处理任务
104    pub async fn get_next_job(&self) -> JobResult<Option<Job>> {
105        let mut conn = self.conn.clone();
106
107        // 先检查延迟队列
108        let now = chrono::Utc::now().timestamp_millis();
109        let delayed_key = self.key("delayed");
110        let waiting_key = self.key("waiting");
111
112        // 移动到期的延迟任务到等待队列
113        let delayed_jobs: Vec<String> = conn.zrangebyscore(&delayed_key, 0, now).await?;
114
115        for job_id in delayed_jobs {
116            conn.zrem::<_, _, ()>(&delayed_key, &job_id).await?;
117            conn.zadd::<_, _, _, ()>(&waiting_key, &job_id, 0).await?;
118        }
119
120        // 从等待队列获取任务
121        let job_ids: Vec<String> = conn.zrange(&waiting_key, 0, 0).await?;
122
123        if let Some(job_id) = job_ids.first() {
124            // 移除并移到活跃队列
125            conn.zrem::<_, _, ()>(&waiting_key, job_id).await?;
126            conn.sadd::<_, _, ()>(self.key("active"), job_id).await?;
127
128            // 获取任务详情
129            if let Some(mut job) = self.get_job(job_id).await? {
130                job.mark_active();
131                self.update_job(&job).await?;
132                return Ok(Some(job));
133            }
134        }
135
136        Ok(None)
137    }
138
139    /// 完成任务
140    pub async fn complete_job(
141        &self,
142        job: &mut Job,
143        result: Option<serde_json::Value>,
144    ) -> JobResult<()> {
145        let mut conn = self.conn.clone();
146
147        job.mark_completed(result);
148        self.update_job(job).await?;
149
150        // 从活跃队列移除
151        conn.srem::<_, _, ()>(self.key("active"), &job.id).await?;
152        // 添加到完成队列
153        conn.zadd::<_, _, _, ()>(
154            self.key("completed"),
155            &job.id,
156            chrono::Utc::now().timestamp_millis(),
157        )
158        .await?;
159
160        Ok(())
161    }
162
163    /// 失败任务
164    pub async fn fail_job(&self, job: &mut Job, error: &str) -> JobResult<()> {
165        let mut conn = self.conn.clone();
166
167        job.mark_failed(error);
168
169        if job.can_retry() {
170            // 放回等待队列重试
171            job.status = JobStatus::Waiting;
172            self.update_job(job).await?;
173            conn.srem::<_, _, ()>(self.key("active"), &job.id).await?;
174            conn.zadd::<_, _, _, ()>(self.key("waiting"), &job.id, 0)
175                .await?;
176        } else {
177            // 移到失败队列
178            self.update_job(job).await?;
179            conn.srem::<_, _, ()>(self.key("active"), &job.id).await?;
180            conn.zadd::<_, _, _, ()>(
181                self.key("failed"),
182                &job.id,
183                chrono::Utc::now().timestamp_millis(),
184            )
185            .await?;
186        }
187
188        Ok(())
189    }
190
191    /// 暂停队列
192    pub async fn pause(&self) -> JobResult<()> {
193        let mut conn = self.conn.clone();
194        conn.set::<_, _, ()>(self.key("paused"), "1").await?;
195        Ok(())
196    }
197
198    /// 恢复队列
199    pub async fn resume(&self) -> JobResult<()> {
200        let mut conn = self.conn.clone();
201        conn.del::<_, ()>(self.key("paused")).await?;
202        Ok(())
203    }
204
205    /// 检查队列是否暂停
206    pub async fn is_paused(&self) -> JobResult<bool> {
207        let mut conn = self.conn.clone();
208        let paused: Option<String> = conn.get(self.key("paused")).await?;
209        Ok(paused.is_some())
210    }
211
212    /// 获取队列长度
213    pub async fn count(&self, status: JobStatus) -> JobResult<u64> {
214        let mut conn = self.conn.clone();
215        let key = match status {
216            JobStatus::Waiting => self.key("waiting"),
217            JobStatus::Active => self.key("active"),
218            JobStatus::Completed => self.key("completed"),
219            JobStatus::Failed => self.key("failed"),
220            JobStatus::Delayed => self.key("delayed"),
221            _ => return Ok(0),
222        };
223
224        let count: u64 = if status == JobStatus::Active {
225            conn.scard(&key).await?
226        } else {
227            conn.zcard(&key).await?
228        };
229
230        Ok(count)
231    }
232
233    /// 清空队列
234    pub async fn obliterate(&self) -> JobResult<()> {
235        let mut conn = self.conn.clone();
236        let pattern = format!("{}:{}:*", self.prefix, self.name);
237        let keys: Vec<String> = conn.keys(&pattern).await?;
238        if !keys.is_empty() {
239            conn.del::<_, ()>(keys).await?;
240        }
241        Ok(())
242    }
243}