qrush_engine/utils/
jconfig.rs1use crate::registry::get_job_handler;
4use crate::job::Job;
5use anyhow::Result;
6use serde::Serialize;
7use crate::utils::rdconfig::get_redis_connection;
8use redis::AsyncCommands;
9
10#[derive(Serialize, Debug, Clone)]
11pub struct JobInfo {
12 pub id: String,
13 pub job_type: String,
14 pub queue: Option<String>,
15 pub payload: Option<String>,
16 pub status: Option<String>,
17 pub created_at: Option<String>,
18 pub updated_at: Option<String>,
19 pub run_at: Option<String>,
20 pub error: Option<String>,
21 pub failed_at: Option<String>,
22}
23
24pub fn to_job_info(job: &Box<dyn Job>, id: &str) -> JobInfo {
25 JobInfo {
26 id: id.to_string(),
27 job_type: job.name().to_string(),
28 queue: Some(job.queue().to_string()),
29 payload: Some("N/A".to_string()),
30 status: None,
31 created_at: None,
32 updated_at: None,
33 failed_at: None,
34 error: None,
35 run_at: None,
36 }
37}
38
39pub fn extract_job_type(payload: &str) -> Option<String> {
40 let v: serde_json::Value = serde_json::from_str(payload).ok()?;
41 v.get("job_type")?.as_str().map(String::from)
42}
43
44pub async fn deserialize_job(payload: String) -> Option<Box<dyn Job>> {
45 let job_type = extract_job_type(&payload)?;
46 let handler = get_job_handler(&job_type)?;
47 match handler(payload).await {
48 Ok(job) => Some(job),
49 Err(err) => {
50 tracing::error!("Failed to deserialize job '{}': {:?}", job_type, err);
51 None
52 }
53 }
54}
55
56pub async fn fetch_job_info(job_id: &str) -> Result<Option<JobInfo>> {
57 let job_key = format!("xsm:job:{job_id}");
58 let mut conn = get_redis_connection().await?;
59
60 let map: redis::RedisResult<redis::Value> = conn.hgetall(&job_key).await;
61
62 if let Ok(redis::Value::Array(items)) = map { if items.is_empty() {
64 return Ok(None);
65 }
66
67 let mut job_info = JobInfo {
68 id: job_id.to_string(),
69 job_type: "unknown".to_string(),
70 queue: None,
71 status: None,
72 payload: None,
73 created_at: None,
74 updated_at: None,
75 run_at: None,
76 error: None,
77 failed_at: None,
78 };
79
80 for chunk in items.chunks(2) {
81 if let [redis::Value::BulkString(field), redis::Value::BulkString(value)] = chunk { let key = String::from_utf8_lossy(field);
83 let val = String::from_utf8_lossy(value);
84
85 match key.as_ref() {
86 "queue" => job_info.queue = Some(val.to_string()),
87 "status" => job_info.status = Some(val.to_string()),
88 "payload" => {
89 job_info.payload = Some(val.to_string());
90 if let Some(job_type) = extract_job_type(&val) {
91 job_info.job_type = job_type;
92 }
93 }
94 "created_at" => job_info.created_at = Some(val.to_string()),
95 "updated_at" => job_info.updated_at = Some(val.to_string()),
96 "run_at" => job_info.run_at = Some(val.to_string()),
97 "error" => job_info.error = Some(val.to_string()),
98 "failed_at" => job_info.failed_at = Some(val.to_string()),
99 _ => {}
100 }
101 }
102 }
103
104 return Ok(Some(job_info));
105 }
106
107 Ok(None)
108}