qrush_engine/services/
runner_service.rs

1// /Users/snm/ws/xsnm/ws/crates/qrush-engine/src/services/runner_service.rs
2use crate::{registry::get_registered_jobs, config::get_shutdown_notify};
3use crate::utils::rdconfig::get_redis_connection;
4use tokio::time::{sleep, Duration};
5use redis::AsyncCommands;
6use chrono::Utc;
7use futures::FutureExt; // Required for `.now_or_never()`
8use crate::utils::constants::{
9    DELAYED_JOBS_KEY, MAX_RETRIES, PREFIX_QUEUE, PREFIX_JOB,
10    COUNTER_SUCCESS, COUNTER_FAILED, COUNTER_TOTAL_JOBS,
11    SUCCESS_LIST_PREFIX, FAILED_LIST_PREFIX, RETRY_LIST_PREFIX,
12    STATS_JOBS_PREFIX, STATS_JOBS_FAILED_PREFIX, LOGS_PREFIX,
13    FAILED_JOBS_LIST,
14};
15
16
17
18pub async fn start_worker_pool(queue: &str, concurrency: usize) {
19    let shutdown = get_shutdown_notify();    
20    for _ in 0..concurrency {
21        let queue = queue.to_string();
22        let shutdown = shutdown.clone();
23
24        tokio::spawn(async move {
25            let mut error_message: Option<String> = None;
26            let today = Utc::now().date_naive().format("%Y-%m-%d").to_string();
27
28            loop {
29                if shutdown.notified().now_or_never().is_some() {
30                    break;
31                }
32
33                let mut conn = match get_redis_connection().await {
34                    Ok(c) => c,
35                    Err(_) => {
36                        sleep(Duration::from_secs(1)).await;
37                        continue;
38                    }
39                };
40
41                let job_id: Option<String> = conn
42                    .lpop(format!("{PREFIX_QUEUE}:{}", queue), None)
43                    .await
44                    .unwrap_or(None);
45
46                if let Some(job_id) = job_id {
47                    let job_key = format!("{PREFIX_JOB}:{}", job_id);
48                    let job_payload: String = conn.hget(&job_key, "payload").await.unwrap_or_default();
49
50                    let jobs = get_registered_jobs();
51                    let mut handled = false;
52
53                    for (_job_name, handler) in &jobs {
54                        let job_result = handler(job_payload.clone()).await;
55
56                        match job_result {
57                            Ok(job) => {
58                                if let Err(_) = job.before().await {
59                                    let _: () = conn.hset_multiple(&job_key, &[
60                                        ("status", "skipped"),
61                                        ("skipped_at", &Utc::now().to_rfc3339()),
62                                    ]).await.unwrap_or_default();
63                                    break;
64                                }
65
66                                match job.perform().await {
67                                    Ok(_) => {
68                                        let _ = job.after().await;
69                                        let _: () = conn.hset_multiple(&job_key, &[
70                                            ("status", "success"),
71                                            ("completed_at", &Utc::now().to_rfc3339()),
72                                        ]).await.unwrap_or_default();
73                                        let _: () = conn.incr(COUNTER_SUCCESS, 1).await.unwrap_or_default();
74                                        // Track success jobs
75                                        let _: () = conn.rpush(format!("{SUCCESS_LIST_PREFIX}:{}", queue), &job_id).await.unwrap_or_default();
76
77                                        let key = format!("{STATS_JOBS_PREFIX}:{}", today);
78                                        // increment daily success counter
79                                        let _: () = conn.incr(&key, 1).await.unwrap_or_default();
80                                        let _: () = conn.incr(COUNTER_TOTAL_JOBS, 1).await.unwrap_or_default();
81
82                                    }
83                                    Err(err) => {
84                                        let _ = job.on_error(&err).await;
85                                        let retries: i64 = conn.hincr(&job_key, "retries", 1).await.unwrap_or(1);
86                                        let _: () = conn.rpush(format!("{RETRY_LIST_PREFIX}:{}", queue), &job_id).await.unwrap_or_default();
87                                        if retries <= MAX_RETRIES as i64 {
88                                            let backoff = 10 * retries;
89                                            let now = Utc::now().timestamp();
90                                            let _: () = conn.zadd(DELAYED_JOBS_KEY, &job_id, now + backoff).await.unwrap_or_default();
91                                        }
92                                    }
93                                }
94
95                                let _ = job.always().await;
96                                handled = true;
97                                break;
98                            }
99                            Err(e) => {
100                                error_message = Some(e.to_string());
101                            }
102                        }
103                    }
104
105                    if !handled {
106                        let mut hset_data = vec![
107                            ("status", "failed".to_string()),
108                            ("failed_at", Utc::now().to_rfc3339()),
109                            ("queue", queue.clone()),
110                            ("failed_at", Utc::now().to_rfc3339()),
111                        ];
112
113                        if let Some(ref emsg) = error_message {
114                            hset_data.push(("error", emsg.clone()));
115                        }
116
117                        let fail_key = format!("{STATS_JOBS_FAILED_PREFIX}:{}:failed", today);
118                        // increment daily failed counter
119                        let _: () = conn.incr(&fail_key, 1).await.unwrap_or_default();
120
121
122                        let _: () = conn.hset_multiple(&job_key, &hset_data).await.unwrap_or_default();
123                        
124                        let _: Result<(), _> = conn.lpush(
125                            format!("{LOGS_PREFIX}:{}", queue),
126                            format!("[{}] ❌ Job {} failed", Utc::now(), job_id),
127                        ).await;
128                        let _: Result<(), _> = conn.ltrim(format!("{LOGS_PREFIX}:{}", queue), 0, 99).await;
129                        let _: () = conn.rpush(FAILED_JOBS_LIST, &job_id).await.unwrap_or_default();
130                        let _: () = conn.hset(&job_key, "job_name", jobs.keys().next().unwrap_or(&"unknown")).await.unwrap_or_default();
131                        // Track failed jobs
132                        let _: () = conn.rpush(format!("{FAILED_LIST_PREFIX}:{}", queue), &job_id).await.unwrap_or_default();
133                        let _: () = conn.incr(COUNTER_FAILED, 1).await.unwrap_or_default();
134                    }
135                }
136
137                sleep(Duration::from_millis(500)).await;
138            }
139        });
140    }
141}
142
143
144pub async fn start_delayed_worker_pool() {
145    tokio::spawn(async move {
146        loop {
147            let now = chrono::Utc::now().timestamp();
148            let mut conn = match get_redis_connection().await {
149                Ok(c) => c,
150                Err(_) => {
151                    tokio::time::sleep(Duration::from_secs(1)).await;
152                    continue;
153                }
154            };
155
156            let jobs: Vec<String> = conn.zrangebyscore(DELAYED_JOBS_KEY, 0, now).await.unwrap_or_default();
157            for job_str in jobs {
158                let _: () = conn.lpush(format!("{PREFIX_QUEUE}:default"), &job_str).await.unwrap_or_default();
159                let _: () = conn.zrem(DELAYED_JOBS_KEY, &job_str).await.unwrap_or_default();
160            }
161            tokio::time::sleep(Duration::from_secs(5)).await;
162        }
163    });
164}
165
166