qrush_engine/utils/
jconfig.rs

1// src/utils/jconfig.rs
2
3use crate::registry::get_job_handler;
4use crate::job::Job;
5use anyhow::Result;
6use serde::Serialize;
7use crate::utils::rdconfig::get_redis_connection;
8use redis::AsyncCommands;
9
10#[derive(Serialize, Debug, Clone)]
11pub struct JobInfo {
12    pub id: String,
13    pub job_type: String,
14    pub queue: Option<String>,
15    pub payload: Option<String>,
16    pub status: Option<String>,
17    pub created_at: Option<String>,
18    pub updated_at: Option<String>,
19    pub run_at: Option<String>,
20    pub error: Option<String>,
21    pub failed_at: Option<String>,
22}
23
24pub fn to_job_info(job: &Box<dyn Job>, id: &str) -> JobInfo {
25    JobInfo {
26        id: id.to_string(),
27        job_type: job.name().to_string(),
28        queue: Some(job.queue().to_string()),
29        payload: Some("N/A".to_string()),
30        status: None,
31        created_at: None,
32        updated_at: None,
33        failed_at: None,
34        error: None,
35        run_at: None,
36    }
37}
38
39pub fn extract_job_type(payload: &str) -> Option<String> {
40    let v: serde_json::Value = serde_json::from_str(payload).ok()?;
41    v.get("job_type")?.as_str().map(String::from)
42}
43
44pub async fn deserialize_job(payload: String) -> Option<Box<dyn Job>> {
45    let job_type = extract_job_type(&payload)?;
46    let handler = get_job_handler(&job_type)?;
47    match handler(payload).await {
48        Ok(job) => Some(job),
49        Err(err) => {
50            tracing::error!("Failed to deserialize job '{}': {:?}", job_type, err);
51            None
52        }
53    }
54}
55
56pub async fn fetch_job_info(job_id: &str) -> Result<Option<JobInfo>> {
57    let job_key = format!("xsm:job:{job_id}");
58    let mut conn = get_redis_connection().await?;
59
60    let map: redis::RedisResult<redis::Value> = conn.hgetall(&job_key).await;
61
62    if let Ok(redis::Value::Array(items)) = map {  // Changed from Bulk to Array
63        if items.is_empty() {
64            return Ok(None);
65        }
66
67        let mut job_info = JobInfo {
68            id: job_id.to_string(),
69            job_type: "unknown".to_string(),
70            queue: None,
71            status: None,
72            payload: None,
73            created_at: None,
74            updated_at: None,
75            run_at: None,
76            error: None,
77            failed_at: None,
78        };
79
80        for chunk in items.chunks(2) {
81            if let [redis::Value::BulkString(field), redis::Value::BulkString(value)] = chunk {  // Changed from Data to BulkString
82                let key = String::from_utf8_lossy(field);
83                let val = String::from_utf8_lossy(value);
84
85                match key.as_ref() {
86                    "queue" => job_info.queue = Some(val.to_string()),
87                    "status" => job_info.status = Some(val.to_string()),
88                    "payload" => {
89                        job_info.payload = Some(val.to_string());
90                        if let Some(job_type) = extract_job_type(&val) {
91                            job_info.job_type = job_type;
92                        }
93                    }
94                    "created_at" => job_info.created_at = Some(val.to_string()),
95                    "updated_at" => job_info.updated_at = Some(val.to_string()),
96                    "run_at" => job_info.run_at = Some(val.to_string()),
97                    "error" => job_info.error = Some(val.to_string()),
98                    "failed_at" => job_info.failed_at = Some(val.to_string()),
99                    _ => {}
100                }
101            }
102        }
103
104        return Ok(Some(job_info));
105    }
106
107    Ok(None)
108}