qrush_engine/
queue.rs

1// src/queue.rs
2use crate::job::Job;
3use crate::utils::rdconfig::get_redis_connection;
4use crate::utils::constants::{
5    PREFIX_JOB,
6    DELAYED_JOBS_KEY,
7    PREFIX_QUEUE,
8};
9
10
11use serde::Serialize;
12use serde_json::to_string;
13use redis::AsyncCommands;
14use chrono::Utc;
15use nanoid::nanoid;
16
17pub async fn enqueue<J>(job: J) -> anyhow::Result<String>
18where
19    J: Job + Serialize,
20{
21    let mut conn = get_redis_connection().await?;
22    let payload = to_string(&job)?;
23    let job_id = nanoid!(10);
24    let now = Utc::now().to_rfc3339();
25
26    let queue = job.queue();
27    let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
28    let job_key = format!("{PREFIX_JOB}:{job_id}");
29
30    conn.hset_multiple::<_, _, _, ()>(&job_key, &[
31        ("queue", queue),
32        ("status", "pending"),
33        ("payload", &payload),
34        ("created_at", &now),
35    ]).await?;
36
37    conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
38    conn.sadd::<_, _, ()>("xsm:queues", queue).await?;
39
40    println!("✅ Enqueued job: {} in queue: {}", job_id, queue);
41    Ok(job_id)
42}
43
44
45
46
47pub async fn enqueue_in<J>(job: J, delay_secs: u64) -> anyhow::Result<String>
48where
49    J: Job + Serialize,
50{
51    let mut conn = get_redis_connection().await?;
52    let payload = to_string(&job)?;
53    let job_id = nanoid!(10);
54    let now = Utc::now().to_rfc3339();
55    let run_at = Utc::now().timestamp() + delay_secs as i64;
56
57    let queue = job.queue();
58    let job_key = format!("{PREFIX_JOB}:{job_id}");
59
60    conn.hset_multiple::<_, _, _, ()>(&job_key, &[
61        ("queue", queue),
62        ("status", "delayed"),
63        ("payload", &payload),
64        ("created_at", &now),
65        ("run_at", &run_at.to_string()),
66    ]).await?;
67
68    conn.zadd::<_, _, _, ()>(DELAYED_JOBS_KEY, &job_id, run_at).await?;
69    conn.sadd::<_, _, ()>("xsm:queues", queue).await?;
70
71    println!("⏳ Scheduled job: {} to run at: {}", job_id, run_at);
72    Ok(job_id)
73}
74
75
76/// Enqueue a job payload directly into a named queue.
77///
78/// This is used by the cron scheduler and other producers that already have a JSON payload.
79/// The payload must match one of the registered job handlers.
80pub async fn enqueue_raw(queue: &str, payload: String) -> anyhow::Result<String> {
81    let mut conn = get_redis_connection().await?;
82    let job_id = nanoid!(10);
83    let now = Utc::now().to_rfc3339();
84
85    let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
86    let job_key = format!("{PREFIX_JOB}:{job_id}");
87
88    conn.hset_multiple::<_, _, _, ()>(&job_key, &[
89        ("queue", queue),
90        ("status", "pending"),
91        ("payload", &payload),
92        ("created_at", &now),
93    ]).await?;
94
95    conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
96    conn.sadd::<_, _, ()>("xsm:queues", queue).await?;
97
98    Ok(job_id)
99}