1use crate::job::{Job, JobOptions, JobResult, JobStatus};
4use redis::aio::MultiplexedConnection;
5use redis::AsyncCommands;
6
7#[derive(Clone)]
9pub struct Queue {
10 name: String,
12 prefix: String,
14 conn: MultiplexedConnection,
16}
17
18impl Queue {
19 pub async fn new(name: &str, prefix: &str, redis_url: &str) -> JobResult<Self> {
21 let client = redis::Client::open(redis_url)?;
22 let conn = client.get_multiplexed_async_connection().await?;
23 Ok(Self {
24 name: name.to_string(),
25 prefix: prefix.to_string(),
26 conn,
27 })
28 }
29
30 fn key(&self, suffix: &str) -> String {
32 format!("{}:{}:{}", self.prefix, self.name, suffix)
33 }
34
35 pub async fn add(
37 &self,
38 name: &str,
39 data: serde_json::Value,
40 options: JobOptions,
41 ) -> JobResult<Job> {
42 let job = Job::new(&self.name, name, data, options);
43 let job_json = serde_json::to_string(&job)?;
44
45 let mut conn = self.conn.clone();
46
47 let job_key = self.key(&format!("job:{}", job.id));
49 conn.set::<_, _, ()>(&job_key, &job_json).await?;
50
51 if let Some(delay) = job.options.delay {
53 let score = chrono::Utc::now().timestamp_millis() + delay as i64;
54 conn.zadd::<_, _, _, ()>(self.key("delayed"), &job.id, score)
55 .await?;
56 } else {
57 let score = -job.options.priority as f64;
59 conn.zadd::<_, _, _, ()>(self.key("waiting"), &job.id, score)
60 .await?;
61 }
62
63 Ok(job)
64 }
65
66 pub async fn add_bulk(
68 &self,
69 jobs: Vec<(String, serde_json::Value, JobOptions)>,
70 ) -> JobResult<Vec<Job>> {
71 let mut results = Vec::new();
72 for (name, data, options) in jobs {
73 let job = self.add(&name, data, options).await?;
74 results.push(job);
75 }
76 Ok(results)
77 }
78
79 pub async fn get_job(&self, job_id: &str) -> JobResult<Option<Job>> {
81 let mut conn = self.conn.clone();
82 let job_key = self.key(&format!("job:{}", job_id));
83 let job_json: Option<String> = conn.get(&job_key).await?;
84
85 match job_json {
86 Some(json) => {
87 let job: Job = serde_json::from_str(&json)?;
88 Ok(Some(job))
89 }
90 None => Ok(None),
91 }
92 }
93
94 pub async fn update_job(&self, job: &Job) -> JobResult<()> {
96 let mut conn = self.conn.clone();
97 let job_key = self.key(&format!("job:{}", job.id));
98 let job_json = serde_json::to_string(job)?;
99 conn.set::<_, _, ()>(&job_key, &job_json).await?;
100 Ok(())
101 }
102
103 pub async fn get_next_job(&self) -> JobResult<Option<Job>> {
105 let mut conn = self.conn.clone();
106
107 let now = chrono::Utc::now().timestamp_millis();
109 let delayed_key = self.key("delayed");
110 let waiting_key = self.key("waiting");
111
112 let delayed_jobs: Vec<String> = conn.zrangebyscore(&delayed_key, 0, now).await?;
114
115 for job_id in delayed_jobs {
116 conn.zrem::<_, _, ()>(&delayed_key, &job_id).await?;
117 conn.zadd::<_, _, _, ()>(&waiting_key, &job_id, 0).await?;
118 }
119
120 let job_ids: Vec<String> = conn.zrange(&waiting_key, 0, 0).await?;
122
123 if let Some(job_id) = job_ids.first() {
124 conn.zrem::<_, _, ()>(&waiting_key, job_id).await?;
126 conn.sadd::<_, _, ()>(self.key("active"), job_id).await?;
127
128 if let Some(mut job) = self.get_job(job_id).await? {
130 job.mark_active();
131 self.update_job(&job).await?;
132 return Ok(Some(job));
133 }
134 }
135
136 Ok(None)
137 }
138
139 pub async fn complete_job(
141 &self,
142 job: &mut Job,
143 result: Option<serde_json::Value>,
144 ) -> JobResult<()> {
145 let mut conn = self.conn.clone();
146
147 job.mark_completed(result);
148 self.update_job(job).await?;
149
150 conn.srem::<_, _, ()>(self.key("active"), &job.id).await?;
152 conn.zadd::<_, _, _, ()>(
154 self.key("completed"),
155 &job.id,
156 chrono::Utc::now().timestamp_millis(),
157 )
158 .await?;
159
160 Ok(())
161 }
162
163 pub async fn fail_job(&self, job: &mut Job, error: &str) -> JobResult<()> {
165 let mut conn = self.conn.clone();
166
167 job.mark_failed(error);
168
169 if job.can_retry() {
170 job.status = JobStatus::Waiting;
172 self.update_job(job).await?;
173 conn.srem::<_, _, ()>(self.key("active"), &job.id).await?;
174 conn.zadd::<_, _, _, ()>(self.key("waiting"), &job.id, 0)
175 .await?;
176 } else {
177 self.update_job(job).await?;
179 conn.srem::<_, _, ()>(self.key("active"), &job.id).await?;
180 conn.zadd::<_, _, _, ()>(
181 self.key("failed"),
182 &job.id,
183 chrono::Utc::now().timestamp_millis(),
184 )
185 .await?;
186 }
187
188 Ok(())
189 }
190
191 pub async fn pause(&self) -> JobResult<()> {
193 let mut conn = self.conn.clone();
194 conn.set::<_, _, ()>(self.key("paused"), "1").await?;
195 Ok(())
196 }
197
198 pub async fn resume(&self) -> JobResult<()> {
200 let mut conn = self.conn.clone();
201 conn.del::<_, ()>(self.key("paused")).await?;
202 Ok(())
203 }
204
205 pub async fn is_paused(&self) -> JobResult<bool> {
207 let mut conn = self.conn.clone();
208 let paused: Option<String> = conn.get(self.key("paused")).await?;
209 Ok(paused.is_some())
210 }
211
212 pub async fn count(&self, status: JobStatus) -> JobResult<u64> {
214 let mut conn = self.conn.clone();
215 let key = match status {
216 JobStatus::Waiting => self.key("waiting"),
217 JobStatus::Active => self.key("active"),
218 JobStatus::Completed => self.key("completed"),
219 JobStatus::Failed => self.key("failed"),
220 JobStatus::Delayed => self.key("delayed"),
221 _ => return Ok(0),
222 };
223
224 let count: u64 = if status == JobStatus::Active {
225 conn.scard(&key).await?
226 } else {
227 conn.zcard(&key).await?
228 };
229
230 Ok(count)
231 }
232
233 pub async fn obliterate(&self) -> JobResult<()> {
235 let mut conn = self.conn.clone();
236 let pattern = format!("{}:{}:*", self.prefix, self.name);
237 let keys: Vec<String> = conn.keys(&pattern).await?;
238 if !keys.is_empty() {
239 conn.del::<_, ()>(keys).await?;
240 }
241 Ok(())
242 }
243}