qrush_engine/services/
runner_service.rs

1
2use crate::{registry::get_registered_jobs, config::get_shutdown_notify};
3use crate::utils::rdconfig::get_redis_connection;
4use tokio::time::{sleep, Duration};
5use redis::AsyncCommands;
6use chrono::Utc;
7use futures::FutureExt; // Required for `.now_or_never()`
8use crate::utils::constants::{DELAYED_JOBS_KEY, MAX_RETRIES};
9
10
11
12pub async fn start_worker_pool(queue: &str, concurrency: usize) {
13    let shutdown = get_shutdown_notify();    
14    for _ in 0..concurrency {
15        let queue = queue.to_string();
16        let shutdown = shutdown.clone();
17
18        tokio::spawn(async move {
19            let mut error_message: Option<String> = None;
20            let today = Utc::now().date_naive().format("%Y-%m-%d").to_string();
21
22            loop {
23                if shutdown.notified().now_or_never().is_some() {
24                    break;
25                }
26
27                let mut conn = match get_redis_connection().await {
28                    Ok(c) => c,
29                    Err(_) => {
30                        sleep(Duration::from_secs(1)).await;
31                        continue;
32                    }
33                };
34
35                let job_id: Option<String> = conn
36                    .lpop(format!("xsm:queue:{}", queue), None)
37                    .await
38                    .unwrap_or(None);
39
40                if let Some(job_id) = job_id {
41                    let job_key = format!("xsm:job:{}", job_id);
42                    let job_payload: String = conn.hget(&job_key, "payload").await.unwrap_or_default();
43
44                    let jobs = get_registered_jobs();
45                    let mut handled = false;
46
47                    for (_job_name, handler) in &jobs {
48                        let job_result = handler(job_payload.clone()).await;
49
50                        match job_result {
51                            Ok(job) => {
52                                if let Err(_) = job.before().await {
53                                    let _: () = conn.hset_multiple(&job_key, &[
54                                        ("status", "skipped"),
55                                        ("skipped_at", &Utc::now().to_rfc3339()),
56                                    ]).await.unwrap_or_default();
57                                    break;
58                                }
59
60                                match job.perform().await {
61                                    Ok(_) => {
62                                        let _ = job.after().await;
63                                        let _: () = conn.hset_multiple(&job_key, &[
64                                            ("status", "success"),
65                                            ("completed_at", &Utc::now().to_rfc3339()),
66                                        ]).await.unwrap_or_default();
67                                        let _: () = conn.incr("xsm:qrush:success", 1).await.unwrap_or_default();
68                                        // Track success jobs
69                                        let _: () = conn.rpush(format!("xsm:success:{}", queue), &job_id).await.unwrap_or_default();
70
71                                        let key = format!("xsm:stats:jobs:{}", today);
72                                        // increment daily success counter
73                                        let _: () = conn.incr(&key, 1).await.unwrap_or_default();
74                                        let _: () = conn.incr("xsm:qrush:total_jobs", 1).await.unwrap_or_default();
75
76                                    }
77                                    Err(err) => {
78                                        let _ = job.on_error(&err).await;
79                                        let retries: i64 = conn.hincr(&job_key, "retries", 1).await.unwrap_or(1);
80                                        let _: () = conn.rpush(format!("xsm:retry:{}", queue), &job_id).await.unwrap_or_default();
81                                        if retries <= MAX_RETRIES as i64 {
82                                            let backoff = 10 * retries;
83                                            let now = Utc::now().timestamp();
84                                            let _: () = conn.zadd(DELAYED_JOBS_KEY, &job_id, now + backoff).await.unwrap_or_default();
85                                        }
86                                    }
87                                }
88
89                                let _ = job.always().await;
90                                handled = true;
91                                break;
92                            }
93                            Err(e) => {
94                                error_message = Some(e.to_string());
95                            }
96                        }
97                    }
98
99                    if !handled {
100                        let mut hset_data = vec![
101                            ("status", "failed".to_string()),
102                            ("failed_at", Utc::now().to_rfc3339()),
103                            ("queue", queue.clone()),
104                            ("failed_at", Utc::now().to_rfc3339()),
105                        ];
106
107                        if let Some(ref emsg) = error_message {
108                            hset_data.push(("error", emsg.clone()));
109                        }
110
111                        let fail_key = format!("xsm:stats:jobs:{}:failed", today);
112                        // increment daily failed counter
113                        let _: () = conn.incr(&fail_key, 1).await.unwrap_or_default();
114
115
116                        let _: () = conn.hset_multiple(&job_key, &hset_data).await.unwrap_or_default();
117                        
118                        let _: Result<(), _> = conn.lpush(
119                            format!("xsm:logs:{}", queue),
120                            format!("[{}] ❌ Job {} failed", Utc::now(), job_id),
121                        ).await;
122                        let _: Result<(), _> = conn.ltrim(format!("xsm:logs:{}", queue), 0, 99).await;
123                        let _: () = conn.rpush("xsm:failed_jobs", &job_id).await.unwrap_or_default();
124                        let _: () = conn.hset(&job_key, "job_name", jobs.keys().next().unwrap_or(&"unknown")).await.unwrap_or_default();
125                        // Track failed jobs
126                        let _: () = conn.rpush(format!("xsm:failed:{}", queue), &job_id).await.unwrap_or_default();
127                        let _: () = conn.incr("xsm:qrush:failed", 1).await.unwrap_or_default();
128                    }
129                }
130
131                sleep(Duration::from_millis(500)).await;
132            }
133        });
134    }
135}
136
137
138pub async fn start_delayed_worker_pool() {
139    tokio::spawn(async move {
140        loop {
141            let now = chrono::Utc::now().timestamp();
142            let mut conn = match get_redis_connection().await {
143                Ok(c) => c,
144                Err(_) => {
145                    tokio::time::sleep(Duration::from_secs(1)).await;
146                    continue;
147                }
148            };
149
150            let jobs: Vec<String> = conn.zrangebyscore(DELAYED_JOBS_KEY, 0, now).await.unwrap_or_default();
151            for job_str in jobs {
152                let _: () = conn.lpush("xsm:queue:default", &job_str).await.unwrap_or_default();
153                let _: () = conn.zrem(DELAYED_JOBS_KEY, &job_str).await.unwrap_or_default();
154            }
155            tokio::time::sleep(Duration::from_secs(5)).await;
156        }
157    });
158}
159
160