1use std::sync::Arc;
2
3use redis::AsyncCommands;
4use serde::{Deserialize, Serialize};
5use shaperail_core::ShaperailError;
6use uuid::Uuid;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "lowercase")]
11pub enum JobPriority {
12 Critical,
13 High,
14 Normal,
15 Low,
16}
17
18impl JobPriority {
19 pub fn all() -> &'static [JobPriority] {
21 &[
22 JobPriority::Critical,
23 JobPriority::High,
24 JobPriority::Normal,
25 JobPriority::Low,
26 ]
27 }
28
29 pub fn queue_key(&self) -> &'static str {
31 match self {
32 JobPriority::Critical => "shaperail:jobs:queue:critical",
33 JobPriority::High => "shaperail:jobs:queue:high",
34 JobPriority::Normal => "shaperail:jobs:queue:normal",
35 JobPriority::Low => "shaperail:jobs:queue:low",
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "lowercase")]
43pub enum JobStatus {
44 Pending,
45 Running,
46 Completed,
47 Failed,
48}
49
50impl std::fmt::Display for JobStatus {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 JobStatus::Pending => write!(f, "pending"),
54 JobStatus::Running => write!(f, "running"),
55 JobStatus::Completed => write!(f, "completed"),
56 JobStatus::Failed => write!(f, "failed"),
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JobEnvelope {
64 pub id: String,
65 pub name: String,
66 pub payload: serde_json::Value,
67 pub priority: JobPriority,
68 pub max_retries: u32,
69 pub timeout_secs: u64,
70 pub attempt: u32,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct JobInfo {
76 pub id: String,
77 pub name: String,
78 pub status: JobStatus,
79 pub attempt: u32,
80 pub max_retries: u32,
81 pub error: Option<String>,
82 pub created_at: String,
83 pub updated_at: String,
84}
85
86#[derive(Clone)]
91pub struct JobQueue {
92 pool: Arc<deadpool_redis::Pool>,
93}
94
95impl JobQueue {
96 pub fn new(pool: Arc<deadpool_redis::Pool>) -> Self {
98 Self { pool }
99 }
100
101 pub async fn enqueue(
105 &self,
106 name: &str,
107 payload: serde_json::Value,
108 priority: JobPriority,
109 ) -> Result<String, ShaperailError> {
110 self.enqueue_with_options(name, payload, priority, 3, 300)
111 .await
112 }
113
114 pub async fn enqueue_with_options(
119 &self,
120 name: &str,
121 payload: serde_json::Value,
122 priority: JobPriority,
123 max_retries: u32,
124 timeout_secs: u64,
125 ) -> Result<String, ShaperailError> {
126 let id = Uuid::new_v4().to_string();
127 let now = chrono::Utc::now().to_rfc3339();
128
129 let envelope = JobEnvelope {
130 id: id.clone(),
131 name: name.to_string(),
132 payload,
133 priority,
134 max_retries,
135 timeout_secs,
136 attempt: 0,
137 };
138
139 let envelope_json = serde_json::to_string(&envelope)
140 .map_err(|e| ShaperailError::Internal(format!("Failed to serialize job: {e}")))?;
141
142 let mut conn = self
143 .pool
144 .get()
145 .await
146 .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
147
148 let meta_key = format!("shaperail:jobs:meta:{id}");
150 redis::cmd("HSET")
151 .arg(&meta_key)
152 .arg("id")
153 .arg(&id)
154 .arg("name")
155 .arg(name)
156 .arg("status")
157 .arg(JobStatus::Pending.to_string())
158 .arg("attempt")
159 .arg("0")
160 .arg("max_retries")
161 .arg(max_retries.to_string())
162 .arg("created_at")
163 .arg(&now)
164 .arg("updated_at")
165 .arg(&now)
166 .query_async::<()>(&mut *conn)
167 .await
168 .map_err(|e| ShaperailError::Internal(format!("Failed to store job metadata: {e}")))?;
169
170 let _: Result<(), _> = conn.expire(&meta_key, 604800).await;
172
173 conn.rpush::<_, _, ()>(priority.queue_key(), &envelope_json)
175 .await
176 .map_err(|e| ShaperailError::Internal(format!("Failed to enqueue job: {e}")))?;
177
178 tracing::info!(job_id = %id, job_name = name, priority = ?priority, "Job enqueued");
179 Ok(id)
180 }
181
182 pub async fn get_status(&self, job_id: &str) -> Result<JobInfo, ShaperailError> {
184 let mut conn = self
185 .pool
186 .get()
187 .await
188 .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
189
190 let meta_key = format!("shaperail:jobs:meta:{job_id}");
191 let values: Vec<String> = redis::cmd("HGETALL")
192 .arg(&meta_key)
193 .query_async(&mut *conn)
194 .await
195 .map_err(|e| ShaperailError::Internal(format!("Failed to get job status: {e}")))?;
196
197 if values.is_empty() {
198 return Err(ShaperailError::NotFound);
199 }
200
201 let mut map = std::collections::HashMap::new();
203 for chunk in values.chunks(2) {
204 if chunk.len() == 2 {
205 map.insert(chunk[0].clone(), chunk[1].clone());
206 }
207 }
208
209 let status = match map.get("status").map(|s| s.as_str()) {
210 Some("pending") => JobStatus::Pending,
211 Some("running") => JobStatus::Running,
212 Some("completed") => JobStatus::Completed,
213 Some("failed") => JobStatus::Failed,
214 _ => JobStatus::Pending,
215 };
216
217 Ok(JobInfo {
218 id: map.get("id").cloned().unwrap_or_default(),
219 name: map.get("name").cloned().unwrap_or_default(),
220 status,
221 attempt: map.get("attempt").and_then(|s| s.parse().ok()).unwrap_or(0),
222 max_retries: map
223 .get("max_retries")
224 .and_then(|s| s.parse().ok())
225 .unwrap_or(3),
226 error: map.get("error").cloned(),
227 created_at: map.get("created_at").cloned().unwrap_or_default(),
228 updated_at: map.get("updated_at").cloned().unwrap_or_default(),
229 })
230 }
231
232 pub async fn total_depth(&self) -> Result<i64, ShaperailError> {
234 let mut conn = self
235 .pool
236 .get()
237 .await
238 .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
239
240 let mut total = 0_i64;
241 for priority in JobPriority::all() {
242 let len: i64 = redis::cmd("LLEN")
243 .arg(priority.queue_key())
244 .query_async(&mut *conn)
245 .await
246 .map_err(|e| {
247 ShaperailError::Internal(format!("Failed to inspect job queue depth: {e}"))
248 })?;
249 total += len;
250 }
251
252 Ok(total)
253 }
254
255 pub(crate) async fn update_status(
257 &self,
258 job_id: &str,
259 status: JobStatus,
260 attempt: u32,
261 error: Option<&str>,
262 ) -> Result<(), ShaperailError> {
263 let mut conn = self
264 .pool
265 .get()
266 .await
267 .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
268
269 let meta_key = format!("shaperail:jobs:meta:{job_id}");
270 let now = chrono::Utc::now().to_rfc3339();
271
272 let mut cmd = redis::cmd("HSET");
273 cmd.arg(&meta_key)
274 .arg("status")
275 .arg(status.to_string())
276 .arg("attempt")
277 .arg(attempt.to_string())
278 .arg("updated_at")
279 .arg(&now);
280
281 if let Some(err_msg) = error {
282 cmd.arg("error").arg(err_msg);
283 }
284
285 cmd.query_async::<()>(&mut *conn)
286 .await
287 .map_err(|e| ShaperailError::Internal(format!("Failed to update job status: {e}")))?;
288
289 Ok(())
290 }
291
292 pub(crate) async fn move_to_dead_letter(
294 &self,
295 envelope: &JobEnvelope,
296 error: &str,
297 ) -> Result<(), ShaperailError> {
298 let mut conn = self
299 .pool
300 .get()
301 .await
302 .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
303
304 let dead_entry = serde_json::json!({
305 "id": envelope.id,
306 "name": envelope.name,
307 "payload": envelope.payload,
308 "error": error,
309 "attempts": envelope.attempt,
310 "failed_at": chrono::Utc::now().to_rfc3339(),
311 });
312
313 let dead_json = serde_json::to_string(&dead_entry).map_err(|e| {
314 ShaperailError::Internal(format!("Failed to serialize dead letter: {e}"))
315 })?;
316
317 conn.rpush::<_, _, ()>("shaperail:jobs:dead", &dead_json)
318 .await
319 .map_err(|e| {
320 ShaperailError::Internal(format!("Failed to push to dead letter queue: {e}"))
321 })?;
322
323 self.update_status(
325 &envelope.id,
326 JobStatus::Failed,
327 envelope.attempt,
328 Some(error),
329 )
330 .await?;
331
332 tracing::warn!(
333 job_id = %envelope.id,
334 job_name = %envelope.name,
335 attempts = envelope.attempt,
336 "Job moved to dead letter queue"
337 );
338
339 Ok(())
340 }
341
342 pub(crate) async fn requeue_for_retry(
344 &self,
345 mut envelope: JobEnvelope,
346 ) -> Result<(), ShaperailError> {
347 envelope.attempt += 1;
348
349 let mut conn = self
350 .pool
351 .get()
352 .await
353 .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
354
355 let envelope_json = serde_json::to_string(&envelope)
356 .map_err(|e| ShaperailError::Internal(format!("Failed to serialize job: {e}")))?;
357
358 conn.rpush::<_, _, ()>(envelope.priority.queue_key(), &envelope_json)
360 .await
361 .map_err(|e| ShaperailError::Internal(format!("Failed to requeue job: {e}")))?;
362
363 self.update_status(&envelope.id, JobStatus::Pending, envelope.attempt, None)
364 .await?;
365
366 Ok(())
367 }
368
369 pub(crate) fn pool(&self) -> &Arc<deadpool_redis::Pool> {
371 &self.pool
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 #[test]
380 fn priority_queue_keys() {
381 assert_eq!(
382 JobPriority::Critical.queue_key(),
383 "shaperail:jobs:queue:critical"
384 );
385 assert_eq!(JobPriority::High.queue_key(), "shaperail:jobs:queue:high");
386 assert_eq!(
387 JobPriority::Normal.queue_key(),
388 "shaperail:jobs:queue:normal"
389 );
390 assert_eq!(JobPriority::Low.queue_key(), "shaperail:jobs:queue:low");
391 }
392
393 #[test]
394 fn priority_all_order() {
395 let all = JobPriority::all();
396 assert_eq!(all.len(), 4);
397 assert_eq!(all[0], JobPriority::Critical);
398 assert_eq!(all[1], JobPriority::High);
399 assert_eq!(all[2], JobPriority::Normal);
400 assert_eq!(all[3], JobPriority::Low);
401 }
402
403 #[test]
404 fn job_status_display() {
405 assert_eq!(JobStatus::Pending.to_string(), "pending");
406 assert_eq!(JobStatus::Running.to_string(), "running");
407 assert_eq!(JobStatus::Completed.to_string(), "completed");
408 assert_eq!(JobStatus::Failed.to_string(), "failed");
409 }
410
411 #[test]
412 fn job_envelope_serde_roundtrip() {
413 let envelope = JobEnvelope {
414 id: "test-id".to_string(),
415 name: "send_email".to_string(),
416 payload: serde_json::json!({"user_id": "123"}),
417 priority: JobPriority::Normal,
418 max_retries: 3,
419 timeout_secs: 300,
420 attempt: 0,
421 };
422 let json = serde_json::to_string(&envelope).unwrap();
423 let back: JobEnvelope = serde_json::from_str(&json).unwrap();
424 assert_eq!(back.id, "test-id");
425 assert_eq!(back.name, "send_email");
426 assert_eq!(back.priority, JobPriority::Normal);
427 assert_eq!(back.max_retries, 3);
428 assert_eq!(back.timeout_secs, 300);
429 assert_eq!(back.attempt, 0);
430 }
431
432 #[test]
433 fn job_priority_serde() {
434 let json = serde_json::to_string(&JobPriority::Critical).unwrap();
435 assert_eq!(json, "\"critical\"");
436 let back: JobPriority = serde_json::from_str(&json).unwrap();
437 assert_eq!(back, JobPriority::Critical);
438 }
439
440 #[test]
441 fn job_status_serde() {
442 let json = serde_json::to_string(&JobStatus::Completed).unwrap();
443 assert_eq!(json, "\"completed\"");
444 let back: JobStatus = serde_json::from_str(&json).unwrap();
445 assert_eq!(back, JobStatus::Completed);
446 }
447}