qrush_engine/
queue.rs

1// src/queue.rs
2use crate::job::Job;
3use crate::utils::rdconfig::get_redis_connection;
4use crate::utils::constants::{
5    PREFIX_JOB,
6    DELAYED_JOBS_KEY,
7    PREFIX_QUEUE,
8    QUEUES_SET,
9};
10
11
12use serde::Serialize;
13use serde_json::to_string;
14use redis::AsyncCommands;
15use chrono::Utc;
16use nanoid::nanoid;
17
18pub async fn enqueue<J>(job: J) -> anyhow::Result<String>
19where
20    J: Job + Serialize,
21{
22    let mut conn = get_redis_connection().await?;
23    let payload = to_string(&job)?;
24    let job_id = nanoid!(10);
25    let now = Utc::now().to_rfc3339();
26
27    let queue = job.queue();
28    let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
29    let job_key = format!("{PREFIX_JOB}:{job_id}");
30
31    conn.hset_multiple::<_, _, _, ()>(&job_key, &[
32        ("queue", queue),
33        ("status", "pending"),
34        ("payload", &payload),
35        ("created_at", &now),
36    ]).await?;
37
38    conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
39    conn.sadd::<_, _, ()>(QUEUES_SET, queue).await?;
40
41    println!("✅ Enqueued job: {} in queue: {}", job_id, queue);
42    Ok(job_id)
43}
44
45
46
47
48pub async fn enqueue_in<J>(job: J, delay_secs: u64) -> anyhow::Result<String>
49where
50    J: Job + Serialize,
51{
52    let mut conn = get_redis_connection().await?;
53    let payload = to_string(&job)?;
54    let job_id = nanoid!(10);
55    let now = Utc::now().to_rfc3339();
56    let run_at = Utc::now().timestamp() + delay_secs as i64;
57
58    let queue = job.queue();
59    let job_key = format!("{PREFIX_JOB}:{job_id}");
60
61    conn.hset_multiple::<_, _, _, ()>(&job_key, &[
62        ("queue", queue),
63        ("status", "delayed"),
64        ("payload", &payload),
65        ("created_at", &now),
66        ("run_at", &run_at.to_string()),
67    ]).await?;
68
69    conn.zadd::<_, _, _, ()>(DELAYED_JOBS_KEY, &job_id, run_at).await?;
70    conn.sadd::<_, _, ()>(QUEUES_SET, queue).await?;
71
72    println!("⏳ Scheduled job: {} to run at: {}", job_id, run_at);
73    Ok(job_id)
74}
75
76
77/// Enqueue a job payload directly into a named queue.
78///
79/// This is used by the cron scheduler and other producers that already have a JSON payload.
80/// The payload must match one of the registered job handlers.
81pub async fn enqueue_raw(queue: &str, payload: String) -> anyhow::Result<String> {
82    let mut conn = get_redis_connection().await?;
83    let job_id = nanoid!(10);
84    let now = Utc::now().to_rfc3339();
85
86    let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
87    let job_key = format!("{PREFIX_JOB}:{job_id}");
88
89    conn.hset_multiple::<_, _, _, ()>(&job_key, &[
90        ("queue", queue),
91        ("status", "pending"),
92        ("payload", &payload),
93        ("created_at", &now),
94    ]).await?;
95
96    conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
97    conn.sadd::<_, _, ()>(QUEUES_SET, queue).await?;
98
99    Ok(job_id)
100}