1use crate::{Error, JobPayload, QueueConfig};
4use chrono::{DateTime, Utc};
5use redis::aio::ConnectionManager;
6use redis::AsyncCommands;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tracing::debug;
10
11#[derive(Debug, Clone, Serialize, Deserialize, Default)]
13pub struct QueueStats {
14 pub queues: Vec<SingleQueueStats>,
16 pub total_failed: usize,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SingleQueueStats {
23 pub name: String,
25 pub pending: usize,
27 pub delayed: usize,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct JobInfo {
34 pub id: String,
36 pub job_type: String,
38 pub queue: String,
40 pub attempts: u32,
42 pub max_retries: u32,
44 pub created_at: DateTime<Utc>,
46 pub available_at: DateTime<Utc>,
48 pub state: JobState,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(rename_all = "snake_case")]
55pub enum JobState {
56 Pending,
57 Delayed,
58 Failed,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct FailedJobInfo {
64 pub job: JobInfo,
66 pub error: String,
68 pub failed_at: DateTime<Utc>,
70}
71
72#[derive(Debug, Deserialize)]
74struct StoredFailedJob {
75 payload: JobPayload,
76 error: String,
77 failed_at: DateTime<Utc>,
78}
79
80#[derive(Clone)]
82pub struct QueueConnection {
83 conn: ConnectionManager,
85 config: Arc<QueueConfig>,
87}
88
89impl QueueConnection {
90 pub async fn new(config: QueueConfig) -> Result<Self, Error> {
92 let client = redis::Client::open(config.redis_url.as_str())
93 .map_err(|e| Error::ConnectionFailed(e.to_string()))?;
94
95 let conn = ConnectionManager::new(client)
96 .await
97 .map_err(|e| Error::ConnectionFailed(e.to_string()))?;
98
99 Ok(Self {
100 conn,
101 config: Arc::new(config),
102 })
103 }
104
105 pub fn config(&self) -> &QueueConfig {
107 &self.config
108 }
109
110 pub async fn push(&self, payload: JobPayload) -> Result<(), Error> {
112 let queue = &payload.queue;
113 let json = payload.to_json()?;
114
115 if payload.is_available() {
116 let key = self.config.queue_key(queue);
118 self.conn
119 .clone()
120 .lpush::<_, _, ()>(&key, &json)
121 .await
122 .map_err(Error::Redis)?;
123
124 debug!(queue = queue, job_id = %payload.id, "Job pushed to queue");
125 } else {
126 let key = self.config.delayed_key(queue);
128 let score = payload.available_at.timestamp() as f64;
129 self.conn
130 .clone()
131 .zadd::<_, _, _, ()>(&key, &json, score)
132 .await
133 .map_err(Error::Redis)?;
134
135 debug!(
136 queue = queue,
137 job_id = %payload.id,
138 available_at = %payload.available_at,
139 "Job pushed to delayed queue"
140 );
141 }
142
143 Ok(())
144 }
145
146 pub async fn pop(&self, queue: &str) -> Result<Option<JobPayload>, Error> {
148 let key = self.config.queue_key(queue);
149 let timeout = self.config.block_timeout.as_secs() as f64;
150
151 let result: Option<(String, String)> = self
153 .conn
154 .clone()
155 .brpop(&key, timeout)
156 .await
157 .map_err(Error::Redis)?;
158
159 match result {
160 Some((_, json)) => {
161 let mut payload = JobPayload::from_json(&json)?;
162 payload.reserve();
163 Ok(Some(payload))
164 }
165 None => Ok(None),
166 }
167 }
168
169 pub async fn pop_nowait(&self, queue: &str) -> Result<Option<JobPayload>, Error> {
171 let key = self.config.queue_key(queue);
172
173 let result: Option<String> = self
174 .conn
175 .clone()
176 .rpop(&key, None)
177 .await
178 .map_err(Error::Redis)?;
179
180 match result {
181 Some(json) => {
182 let mut payload = JobPayload::from_json(&json)?;
183 payload.reserve();
184 Ok(Some(payload))
185 }
186 None => Ok(None),
187 }
188 }
189
190 pub async fn migrate_delayed(&self, queue: &str) -> Result<usize, Error> {
192 let delayed_key = self.config.delayed_key(queue);
193 let queue_key = self.config.queue_key(queue);
194 let now = chrono::Utc::now().timestamp() as f64;
195
196 let ready_jobs: Vec<String> = self
198 .conn
199 .clone()
200 .zrangebyscore(&delayed_key, "-inf", now)
201 .await
202 .map_err(Error::Redis)?;
203
204 let count = ready_jobs.len();
205
206 for job in ready_jobs {
207 self.conn
209 .clone()
210 .zrem::<_, _, ()>(&delayed_key, &job)
211 .await
212 .map_err(Error::Redis)?;
213
214 self.conn
216 .clone()
217 .lpush::<_, _, ()>(&queue_key, &job)
218 .await
219 .map_err(Error::Redis)?;
220 }
221
222 if count > 0 {
223 debug!(queue = queue, count = count, "Migrated delayed jobs");
224 }
225
226 Ok(count)
227 }
228
229 pub async fn release(
231 &self,
232 mut payload: JobPayload,
233 delay: std::time::Duration,
234 ) -> Result<(), Error> {
235 payload.increment_attempts();
236 payload.reserved_at = None;
237
238 if delay.is_zero() {
239 payload.available_at = chrono::Utc::now();
240 } else {
241 payload.available_at =
242 chrono::Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
243 }
244
245 self.push(payload).await
246 }
247
248 pub async fn fail(&self, payload: JobPayload, error: &Error) -> Result<(), Error> {
250 let failed_key = self.config.failed_key();
251
252 #[derive(serde::Serialize)]
253 struct FailedJob {
254 payload: JobPayload,
255 error: String,
256 failed_at: chrono::DateTime<chrono::Utc>,
257 }
258
259 let failed = FailedJob {
260 payload,
261 error: error.to_string(),
262 failed_at: chrono::Utc::now(),
263 };
264
265 let json = serde_json::to_string(&failed)
266 .map_err(|e| Error::SerializationFailed(e.to_string()))?;
267
268 self.conn
269 .clone()
270 .lpush::<_, _, ()>(&failed_key, &json)
271 .await
272 .map_err(Error::Redis)?;
273
274 Ok(())
275 }
276
277 pub async fn size(&self, queue: &str) -> Result<usize, Error> {
279 let key = self.config.queue_key(queue);
280 let len: usize = self.conn.clone().llen(&key).await.map_err(Error::Redis)?;
281 Ok(len)
282 }
283
284 pub async fn delayed_size(&self, queue: &str) -> Result<usize, Error> {
286 let key = self.config.delayed_key(queue);
287 let len: usize = self.conn.clone().zcard(&key).await.map_err(Error::Redis)?;
288 Ok(len)
289 }
290
291 pub async fn clear(&self, queue: &str) -> Result<(), Error> {
293 let queue_key = self.config.queue_key(queue);
294 let delayed_key = self.config.delayed_key(queue);
295
296 self.conn
297 .clone()
298 .del::<_, ()>(&queue_key)
299 .await
300 .map_err(Error::Redis)?;
301 self.conn
302 .clone()
303 .del::<_, ()>(&delayed_key)
304 .await
305 .map_err(Error::Redis)?;
306
307 Ok(())
308 }
309
310 pub async fn get_pending_jobs(&self, queue: &str, limit: usize) -> Result<Vec<JobInfo>, Error> {
312 let key = self.config.queue_key(queue);
313 let jobs: Vec<String> = self
314 .conn
315 .clone()
316 .lrange(&key, 0, limit as isize - 1)
317 .await
318 .map_err(Error::Redis)?;
319
320 let mut result = Vec::with_capacity(jobs.len());
321 for json in jobs {
322 if let Ok(payload) = JobPayload::from_json(&json) {
323 result.push(JobInfo {
324 id: payload.id.to_string(),
325 job_type: payload.job_type,
326 queue: payload.queue,
327 attempts: payload.attempts,
328 max_retries: payload.max_retries,
329 created_at: payload.created_at,
330 available_at: payload.available_at,
331 state: JobState::Pending,
332 });
333 }
334 }
335 Ok(result)
336 }
337
338 pub async fn get_delayed_jobs(&self, queue: &str, limit: usize) -> Result<Vec<JobInfo>, Error> {
340 let key = self.config.delayed_key(queue);
341 let jobs: Vec<String> = self
342 .conn
343 .clone()
344 .zrange(&key, 0, limit as isize - 1)
345 .await
346 .map_err(Error::Redis)?;
347
348 let mut result = Vec::with_capacity(jobs.len());
349 for json in jobs {
350 if let Ok(payload) = JobPayload::from_json(&json) {
351 result.push(JobInfo {
352 id: payload.id.to_string(),
353 job_type: payload.job_type,
354 queue: payload.queue,
355 attempts: payload.attempts,
356 max_retries: payload.max_retries,
357 created_at: payload.created_at,
358 available_at: payload.available_at,
359 state: JobState::Delayed,
360 });
361 }
362 }
363 Ok(result)
364 }
365
366 pub async fn get_failed_jobs(&self, limit: usize) -> Result<Vec<FailedJobInfo>, Error> {
368 let key = self.config.failed_key();
369 let jobs: Vec<String> = self
370 .conn
371 .clone()
372 .lrange(&key, 0, limit as isize - 1)
373 .await
374 .map_err(Error::Redis)?;
375
376 let mut result = Vec::with_capacity(jobs.len());
377 for json in jobs {
378 if let Ok(failed) = serde_json::from_str::<StoredFailedJob>(&json) {
379 result.push(FailedJobInfo {
380 job: JobInfo {
381 id: failed.payload.id.to_string(),
382 job_type: failed.payload.job_type,
383 queue: failed.payload.queue,
384 attempts: failed.payload.attempts,
385 max_retries: failed.payload.max_retries,
386 created_at: failed.payload.created_at,
387 available_at: failed.payload.available_at,
388 state: JobState::Failed,
389 },
390 error: failed.error,
391 failed_at: failed.failed_at,
392 });
393 }
394 }
395 Ok(result)
396 }
397
398 pub async fn failed_count(&self) -> Result<usize, Error> {
400 let key = self.config.failed_key();
401 let len: usize = self.conn.clone().llen(&key).await.map_err(Error::Redis)?;
402 Ok(len)
403 }
404
405 pub async fn get_stats(&self, queues: &[&str]) -> Result<QueueStats, Error> {
407 let mut stats = QueueStats::default();
408
409 for queue in queues {
410 let pending = self.size(queue).await?;
411 let delayed = self.delayed_size(queue).await?;
412 stats.queues.push(SingleQueueStats {
413 name: queue.to_string(),
414 pending,
415 delayed,
416 });
417 }
418
419 stats.total_failed = self.failed_count().await?;
420 Ok(stats)
421 }
422}
423
424pub struct Queue;
426
427impl Queue {
428 pub fn connection() -> &'static QueueConnection {
430 GLOBAL_CONNECTION
431 .get()
432 .expect("Queue not initialized. Call Queue::init() first.")
433 }
434
435 pub async fn init(config: QueueConfig) -> Result<(), Error> {
437 let conn = QueueConnection::new(config).await?;
438 GLOBAL_CONNECTION
439 .set(conn)
440 .map_err(|_| Error::custom("Queue already initialized"))?;
441 Ok(())
442 }
443
444 pub fn is_initialized() -> bool {
446 GLOBAL_CONNECTION.get().is_some()
447 }
448}
449
450static GLOBAL_CONNECTION: std::sync::OnceLock<QueueConnection> = std::sync::OnceLock::new();
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn test_queue_stats_default() {
458 let stats = QueueStats::default();
459 assert!(stats.queues.is_empty());
460 assert_eq!(stats.total_failed, 0);
461 }
462
463 #[test]
464 fn test_queue_stats_serialization() {
465 let stats = QueueStats {
466 queues: vec![
467 SingleQueueStats {
468 name: "default".to_string(),
469 pending: 5,
470 delayed: 2,
471 },
472 SingleQueueStats {
473 name: "emails".to_string(),
474 pending: 10,
475 delayed: 0,
476 },
477 ],
478 total_failed: 3,
479 };
480
481 let json = serde_json::to_string(&stats).unwrap();
482 let restored: QueueStats = serde_json::from_str(&json).unwrap();
483
484 assert_eq!(restored.queues.len(), 2);
485 assert_eq!(restored.queues[0].name, "default");
486 assert_eq!(restored.queues[0].pending, 5);
487 assert_eq!(restored.queues[1].name, "emails");
488 assert_eq!(restored.total_failed, 3);
489 }
490
491 #[test]
492 fn test_single_queue_stats_clone() {
493 let stats = SingleQueueStats {
494 name: "test".to_string(),
495 pending: 10,
496 delayed: 5,
497 };
498
499 let cloned = stats.clone();
500 assert_eq!(cloned.name, stats.name);
501 assert_eq!(cloned.pending, stats.pending);
502 assert_eq!(cloned.delayed, stats.delayed);
503 }
504
505 #[test]
506 fn test_job_state_serialization() {
507 assert_eq!(
508 serde_json::to_string(&JobState::Pending).unwrap(),
509 "\"pending\""
510 );
511 assert_eq!(
512 serde_json::to_string(&JobState::Delayed).unwrap(),
513 "\"delayed\""
514 );
515 assert_eq!(
516 serde_json::to_string(&JobState::Failed).unwrap(),
517 "\"failed\""
518 );
519 }
520
521 #[test]
522 fn test_job_state_deserialization() {
523 let pending: JobState = serde_json::from_str("\"pending\"").unwrap();
524 let delayed: JobState = serde_json::from_str("\"delayed\"").unwrap();
525 let failed: JobState = serde_json::from_str("\"failed\"").unwrap();
526
527 assert!(matches!(pending, JobState::Pending));
528 assert!(matches!(delayed, JobState::Delayed));
529 assert!(matches!(failed, JobState::Failed));
530 }
531
532 #[test]
533 fn test_job_info_serialization() {
534 let now = Utc::now();
535 let job_info = JobInfo {
536 id: "job-123".to_string(),
537 job_type: "SendEmailJob".to_string(),
538 queue: "emails".to_string(),
539 attempts: 2,
540 max_retries: 3,
541 created_at: now,
542 available_at: now,
543 state: JobState::Pending,
544 };
545
546 let json = serde_json::to_string(&job_info).unwrap();
547 let restored: JobInfo = serde_json::from_str(&json).unwrap();
548
549 assert_eq!(restored.id, "job-123");
550 assert_eq!(restored.job_type, "SendEmailJob");
551 assert_eq!(restored.queue, "emails");
552 assert_eq!(restored.attempts, 2);
553 assert_eq!(restored.max_retries, 3);
554 assert!(matches!(restored.state, JobState::Pending));
555 }
556
557 #[test]
558 fn test_job_info_clone() {
559 let now = Utc::now();
560 let job_info = JobInfo {
561 id: "job-456".to_string(),
562 job_type: "ProcessOrder".to_string(),
563 queue: "orders".to_string(),
564 attempts: 0,
565 max_retries: 5,
566 created_at: now,
567 available_at: now,
568 state: JobState::Delayed,
569 };
570
571 let cloned = job_info.clone();
572 assert_eq!(cloned.id, job_info.id);
573 assert_eq!(cloned.job_type, job_info.job_type);
574 }
575
576 #[test]
577 fn test_failed_job_info_serialization() {
578 let now = Utc::now();
579 let failed_job = FailedJobInfo {
580 job: JobInfo {
581 id: "job-789".to_string(),
582 job_type: "FailingJob".to_string(),
583 queue: "default".to_string(),
584 attempts: 3,
585 max_retries: 3,
586 created_at: now,
587 available_at: now,
588 state: JobState::Failed,
589 },
590 error: "Connection refused".to_string(),
591 failed_at: now,
592 };
593
594 let json = serde_json::to_string(&failed_job).unwrap();
595 let restored: FailedJobInfo = serde_json::from_str(&json).unwrap();
596
597 assert_eq!(restored.job.id, "job-789");
598 assert_eq!(restored.error, "Connection refused");
599 assert!(matches!(restored.job.state, JobState::Failed));
600 }
601
602 #[test]
603 fn test_failed_job_info_clone() {
604 let now = Utc::now();
605 let failed_job = FailedJobInfo {
606 job: JobInfo {
607 id: "job-999".to_string(),
608 job_type: "TestJob".to_string(),
609 queue: "test".to_string(),
610 attempts: 1,
611 max_retries: 3,
612 created_at: now,
613 available_at: now,
614 state: JobState::Failed,
615 },
616 error: "Test error".to_string(),
617 failed_at: now,
618 };
619
620 let cloned = failed_job.clone();
621 assert_eq!(cloned.job.id, failed_job.job.id);
622 assert_eq!(cloned.error, failed_job.error);
623 }
624
625 #[test]
626 fn test_job_state_debug() {
627 assert!(format!("{:?}", JobState::Pending).contains("Pending"));
628 assert!(format!("{:?}", JobState::Delayed).contains("Delayed"));
629 assert!(format!("{:?}", JobState::Failed).contains("Failed"));
630 }
631
632 }