1use crate::job::Job;
3use crate::utils::rdconfig::get_redis_connection;
4use crate::utils::constants::{
5 PREFIX_JOB,
6 DELAYED_JOBS_KEY,
7 PREFIX_QUEUE,
8 QUEUES_SET,
9};
10
11
12use serde::Serialize;
13use serde_json::to_string;
14use redis::AsyncCommands;
15use chrono::Utc;
16use nanoid::nanoid;
17
18pub async fn enqueue<J>(job: J) -> anyhow::Result<String>
19where
20 J: Job + Serialize,
21{
22 let mut conn = get_redis_connection().await?;
23 let payload = to_string(&job)?;
24 let job_id = nanoid!(10);
25 let now = Utc::now().to_rfc3339();
26
27 let queue = job.queue();
28 let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
29 let job_key = format!("{PREFIX_JOB}:{job_id}");
30
31 conn.hset_multiple::<_, _, _, ()>(&job_key, &[
32 ("queue", queue),
33 ("status", "pending"),
34 ("payload", &payload),
35 ("created_at", &now),
36 ]).await?;
37
38 conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
39 conn.sadd::<_, _, ()>(QUEUES_SET, queue).await?;
40
41 println!("✅ Enqueued job: {} in queue: {}", job_id, queue);
42 Ok(job_id)
43}
44
45
46
47
48pub async fn enqueue_in<J>(job: J, delay_secs: u64) -> anyhow::Result<String>
49where
50 J: Job + Serialize,
51{
52 let mut conn = get_redis_connection().await?;
53 let payload = to_string(&job)?;
54 let job_id = nanoid!(10);
55 let now = Utc::now().to_rfc3339();
56 let run_at = Utc::now().timestamp() + delay_secs as i64;
57
58 let queue = job.queue();
59 let job_key = format!("{PREFIX_JOB}:{job_id}");
60
61 conn.hset_multiple::<_, _, _, ()>(&job_key, &[
62 ("queue", queue),
63 ("status", "delayed"),
64 ("payload", &payload),
65 ("created_at", &now),
66 ("run_at", &run_at.to_string()),
67 ]).await?;
68
69 conn.zadd::<_, _, _, ()>(DELAYED_JOBS_KEY, &job_id, run_at).await?;
70 conn.sadd::<_, _, ()>(QUEUES_SET, queue).await?;
71
72 println!("⏳ Scheduled job: {} to run at: {}", job_id, run_at);
73 Ok(job_id)
74}
75
76
77pub async fn enqueue_raw(queue: &str, payload: String) -> anyhow::Result<String> {
82 let mut conn = get_redis_connection().await?;
83 let job_id = nanoid!(10);
84 let now = Utc::now().to_rfc3339();
85
86 let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
87 let job_key = format!("{PREFIX_JOB}:{job_id}");
88
89 conn.hset_multiple::<_, _, _, ()>(&job_key, &[
90 ("queue", queue),
91 ("status", "pending"),
92 ("payload", &payload),
93 ("created_at", &now),
94 ]).await?;
95
96 conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
97 conn.sadd::<_, _, ()>(QUEUES_SET, queue).await?;
98
99 Ok(job_id)
100}