qrush_engine/utils/
jconfig.rs1use crate::registry::get_job_handler;
4use crate::job::Job;
5use anyhow::Result;
6use serde::Serialize;
7use crate::utils::rdconfig::get_redis_connection;
8use crate::utils::constants::PREFIX_JOB;
9use redis::AsyncCommands;
10
11#[derive(Serialize, Debug, Clone)]
12pub struct JobInfo {
13 pub id: String,
14 pub job_type: String,
15 pub queue: Option<String>,
16 pub payload: Option<String>,
17 pub status: Option<String>,
18 pub created_at: Option<String>,
19 pub updated_at: Option<String>,
20 pub run_at: Option<String>,
21 pub error: Option<String>,
22 pub failed_at: Option<String>,
23}
24
25pub fn to_job_info(job: &Box<dyn Job>, id: &str) -> JobInfo {
26 JobInfo {
27 id: id.to_string(),
28 job_type: job.name().to_string(),
29 queue: Some(job.queue().to_string()),
30 payload: Some("N/A".to_string()),
31 status: None,
32 created_at: None,
33 updated_at: None,
34 failed_at: None,
35 error: None,
36 run_at: None,
37 }
38}
39
40pub fn extract_job_type(payload: &str) -> Option<String> {
41 let v: serde_json::Value = serde_json::from_str(payload).ok()?;
42 v.get("job_type")?.as_str().map(String::from)
43}
44
45pub async fn deserialize_job(payload: String) -> Option<Box<dyn Job>> {
46 let job_type = extract_job_type(&payload)?;
47 let handler = get_job_handler(&job_type)?;
48 match handler(payload).await {
49 Ok(job) => Some(job),
50 Err(err) => {
51 tracing::error!("Failed to deserialize job '{}': {:?}", job_type, err);
52 None
53 }
54 }
55}
56
57pub async fn fetch_job_info(job_id: &str) -> Result<Option<JobInfo>> {
58 let job_key = format!("{PREFIX_JOB}:{job_id}");
59 let mut conn = get_redis_connection().await?;
60
61 let map: redis::RedisResult<redis::Value> = conn.hgetall(&job_key).await;
62
63 if let Ok(redis::Value::Array(items)) = map { if items.is_empty() {
65 return Ok(None);
66 }
67
68 let mut job_info = JobInfo {
69 id: job_id.to_string(),
70 job_type: "unknown".to_string(),
71 queue: None,
72 status: None,
73 payload: None,
74 created_at: None,
75 updated_at: None,
76 run_at: None,
77 error: None,
78 failed_at: None,
79 };
80
81 for chunk in items.chunks(2) {
82 if let [redis::Value::BulkString(field), redis::Value::BulkString(value)] = chunk { let key = String::from_utf8_lossy(field);
84 let val = String::from_utf8_lossy(value);
85
86 match key.as_ref() {
87 "queue" => job_info.queue = Some(val.to_string()),
88 "status" => job_info.status = Some(val.to_string()),
89 "payload" => {
90 job_info.payload = Some(val.to_string());
91 if let Some(job_type) = extract_job_type(&val) {
92 job_info.job_type = job_type;
93 }
94 }
95 "created_at" => job_info.created_at = Some(val.to_string()),
96 "updated_at" => job_info.updated_at = Some(val.to_string()),
97 "run_at" => job_info.run_at = Some(val.to_string()),
98 "error" => job_info.error = Some(val.to_string()),
99 "failed_at" => job_info.failed_at = Some(val.to_string()),
100 _ => {}
101 }
102 }
103 }
104
105 return Ok(Some(job_info));
106 }
107
108 Ok(None)
109}