qrush_engine/utils/
jconfig.rs

1// src/utils/jconfig.rs
2
3use crate::registry::get_job_handler;
4use crate::job::Job;
5use anyhow::Result;
6use serde::Serialize;
7use crate::utils::rdconfig::get_redis_connection;
8use crate::utils::constants::PREFIX_JOB;
9use redis::AsyncCommands;
10
11#[derive(Serialize, Debug, Clone)]
12pub struct JobInfo {
13    pub id: String,
14    pub job_type: String,
15    pub queue: Option<String>,
16    pub payload: Option<String>,
17    pub status: Option<String>,
18    pub created_at: Option<String>,
19    pub updated_at: Option<String>,
20    pub run_at: Option<String>,
21    pub error: Option<String>,
22    pub failed_at: Option<String>,
23}
24
25pub fn to_job_info(job: &Box<dyn Job>, id: &str) -> JobInfo {
26    JobInfo {
27        id: id.to_string(),
28        job_type: job.name().to_string(),
29        queue: Some(job.queue().to_string()),
30        payload: Some("N/A".to_string()),
31        status: None,
32        created_at: None,
33        updated_at: None,
34        failed_at: None,
35        error: None,
36        run_at: None,
37    }
38}
39
40pub fn extract_job_type(payload: &str) -> Option<String> {
41    let v: serde_json::Value = serde_json::from_str(payload).ok()?;
42    v.get("job_type")?.as_str().map(String::from)
43}
44
45pub async fn deserialize_job(payload: String) -> Option<Box<dyn Job>> {
46    let job_type = extract_job_type(&payload)?;
47    let handler = get_job_handler(&job_type)?;
48    match handler(payload).await {
49        Ok(job) => Some(job),
50        Err(err) => {
51            tracing::error!("Failed to deserialize job '{}': {:?}", job_type, err);
52            None
53        }
54    }
55}
56
57pub async fn fetch_job_info(job_id: &str) -> Result<Option<JobInfo>> {
58    let job_key = format!("{PREFIX_JOB}:{job_id}");
59    let mut conn = get_redis_connection().await?;
60
61    let map: redis::RedisResult<redis::Value> = conn.hgetall(&job_key).await;
62
63    if let Ok(redis::Value::Array(items)) = map {  // Changed from Bulk to Array
64        if items.is_empty() {
65            return Ok(None);
66        }
67
68        let mut job_info = JobInfo {
69            id: job_id.to_string(),
70            job_type: "unknown".to_string(),
71            queue: None,
72            status: None,
73            payload: None,
74            created_at: None,
75            updated_at: None,
76            run_at: None,
77            error: None,
78            failed_at: None,
79        };
80
81        for chunk in items.chunks(2) {
82            if let [redis::Value::BulkString(field), redis::Value::BulkString(value)] = chunk {  // Changed from Data to BulkString
83                let key = String::from_utf8_lossy(field);
84                let val = String::from_utf8_lossy(value);
85
86                match key.as_ref() {
87                    "queue" => job_info.queue = Some(val.to_string()),
88                    "status" => job_info.status = Some(val.to_string()),
89                    "payload" => {
90                        job_info.payload = Some(val.to_string());
91                        if let Some(job_type) = extract_job_type(&val) {
92                            job_info.job_type = job_type;
93                        }
94                    }
95                    "created_at" => job_info.created_at = Some(val.to_string()),
96                    "updated_at" => job_info.updated_at = Some(val.to_string()),
97                    "run_at" => job_info.run_at = Some(val.to_string()),
98                    "error" => job_info.error = Some(val.to_string()),
99                    "failed_at" => job_info.failed_at = Some(val.to_string()),
100                    _ => {}
101                }
102            }
103        }
104
105        return Ok(Some(job_info));
106    }
107
108    Ok(None)
109}