1use crate::job::Job;
3use crate::utils::rdconfig::get_redis_connection;
4use crate::utils::constants::{
5 PREFIX_JOB,
6 DELAYED_JOBS_KEY,
7 PREFIX_QUEUE,
8};
9
10
11use serde::Serialize;
12use serde_json::to_string;
13use redis::AsyncCommands;
14use chrono::Utc;
15use nanoid::nanoid;
16
17pub async fn enqueue<J>(job: J) -> anyhow::Result<String>
18where
19 J: Job + Serialize,
20{
21 let mut conn = get_redis_connection().await?;
22 let payload = to_string(&job)?;
23 let job_id = nanoid!(10);
24 let now = Utc::now().to_rfc3339();
25
26 let queue = job.queue();
27 let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
28 let job_key = format!("{PREFIX_JOB}:{job_id}");
29
30 conn.hset_multiple::<_, _, _, ()>(&job_key, &[
31 ("queue", queue),
32 ("status", "pending"),
33 ("payload", &payload),
34 ("created_at", &now),
35 ]).await?;
36
37 conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
38 conn.sadd::<_, _, ()>("snm:queues", queue).await?;
39
40 println!("✅ Enqueued job: {} in queue: {}", job_id, queue);
41 Ok(job_id)
42}
43
44
45
46
47pub async fn enqueue_in<J>(job: J, delay_secs: u64) -> anyhow::Result<String>
48where
49 J: Job + Serialize,
50{
51 let mut conn = get_redis_connection().await?;
52 let payload = to_string(&job)?;
53 let job_id = nanoid!(10);
54 let now = Utc::now().to_rfc3339();
55 let run_at = Utc::now().timestamp() + delay_secs as i64;
56
57 let queue = job.queue();
58 let job_key = format!("{PREFIX_JOB}:{job_id}");
59
60 conn.hset_multiple::<_, _, _, ()>(&job_key, &[
61 ("queue", queue),
62 ("status", "delayed"),
63 ("payload", &payload),
64 ("created_at", &now),
65 ("run_at", &run_at.to_string()),
66 ]).await?;
67
68 conn.zadd::<_, _, _, ()>(DELAYED_JOBS_KEY, &job_id, run_at).await?;
69 conn.sadd::<_, _, ()>("snm:queues", queue).await?;
70
71 println!("⏳ Scheduled job: {} to run at: {}", job_id, run_at);
72 Ok(job_id)
73}
74
75
76pub async fn enqueue_raw(queue: &str, payload: String) -> anyhow::Result<String> {
81 let mut conn = get_redis_connection().await?;
82 let job_id = nanoid!(10);
83 let now = Utc::now().to_rfc3339();
84
85 let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
86 let job_key = format!("{PREFIX_JOB}:{job_id}");
87
88 conn.hset_multiple::<_, _, _, ()>(&job_key, &[
89 ("queue", queue),
90 ("status", "pending"),
91 ("payload", &payload),
92 ("created_at", &now),
93 ]).await?;
94
95 conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
96 conn.sadd::<_, _, ()>("snm:queues", queue).await?;
97
98 Ok(job_id)
99}