Skip to main content

shaperail_runtime/jobs/
queue.rs

1use std::sync::Arc;
2
3use redis::AsyncCommands;
4use serde::{Deserialize, Serialize};
5use shaperail_core::ShaperailError;
6use uuid::Uuid;
7
8/// Job priority levels, each backed by a separate Redis list.
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "lowercase")]
11pub enum JobPriority {
12    Critical,
13    High,
14    Normal,
15    Low,
16}
17
18impl JobPriority {
19    /// Returns all priorities in polling order (highest first).
20    pub fn all() -> &'static [JobPriority] {
21        &[
22            JobPriority::Critical,
23            JobPriority::High,
24            JobPriority::Normal,
25            JobPriority::Low,
26        ]
27    }
28
29    /// Redis list key for this priority level.
30    pub fn queue_key(&self) -> &'static str {
31        match self {
32            JobPriority::Critical => "shaperail:jobs:queue:critical",
33            JobPriority::High => "shaperail:jobs:queue:high",
34            JobPriority::Normal => "shaperail:jobs:queue:normal",
35            JobPriority::Low => "shaperail:jobs:queue:low",
36        }
37    }
38}
39
40/// Current status of a job.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "lowercase")]
43pub enum JobStatus {
44    Pending,
45    Running,
46    Completed,
47    Failed,
48}
49
50impl std::fmt::Display for JobStatus {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            JobStatus::Pending => write!(f, "pending"),
54            JobStatus::Running => write!(f, "running"),
55            JobStatus::Completed => write!(f, "completed"),
56            JobStatus::Failed => write!(f, "failed"),
57        }
58    }
59}
60
61/// Serialized job envelope stored in Redis.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JobEnvelope {
64    pub id: String,
65    pub name: String,
66    pub payload: serde_json::Value,
67    pub priority: JobPriority,
68    pub max_retries: u32,
69    pub timeout_secs: u64,
70    pub attempt: u32,
71}
72
73/// Metadata about a job stored in a Redis hash.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct JobInfo {
76    pub id: String,
77    pub name: String,
78    pub status: JobStatus,
79    pub attempt: u32,
80    pub max_retries: u32,
81    pub error: Option<String>,
82    pub created_at: String,
83    pub updated_at: String,
84}
85
86/// Redis-backed job queue.
87///
88/// Enqueues jobs into priority-separated Redis lists.
89/// Job metadata is stored in Redis hashes for status queries.
90#[derive(Clone)]
91pub struct JobQueue {
92    pool: Arc<deadpool_redis::Pool>,
93}
94
95impl JobQueue {
96    /// Creates a new job queue backed by the given Redis pool.
97    pub fn new(pool: Arc<deadpool_redis::Pool>) -> Self {
98        Self { pool }
99    }
100
101    /// Enqueues a job with the given name, payload, and priority.
102    ///
103    /// Returns the generated job ID.
104    pub async fn enqueue(
105        &self,
106        name: &str,
107        payload: serde_json::Value,
108        priority: JobPriority,
109    ) -> Result<String, ShaperailError> {
110        self.enqueue_with_options(name, payload, priority, 3, 300)
111            .await
112    }
113
114    /// Enqueues a job with full configuration.
115    ///
116    /// `max_retries` — how many times to retry on failure (default 3).
117    /// `timeout_secs` — auto-fail after this duration (default 300s).
118    pub async fn enqueue_with_options(
119        &self,
120        name: &str,
121        payload: serde_json::Value,
122        priority: JobPriority,
123        max_retries: u32,
124        timeout_secs: u64,
125    ) -> Result<String, ShaperailError> {
126        let id = Uuid::new_v4().to_string();
127        let now = chrono::Utc::now().to_rfc3339();
128
129        let envelope = JobEnvelope {
130            id: id.clone(),
131            name: name.to_string(),
132            payload,
133            priority,
134            max_retries,
135            timeout_secs,
136            attempt: 0,
137        };
138
139        let envelope_json = serde_json::to_string(&envelope)
140            .map_err(|e| ShaperailError::Internal(format!("Failed to serialize job: {e}")))?;
141
142        let mut conn = self
143            .pool
144            .get()
145            .await
146            .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
147
148        // Store job metadata
149        let meta_key = format!("shaperail:jobs:meta:{id}");
150        redis::cmd("HSET")
151            .arg(&meta_key)
152            .arg("id")
153            .arg(&id)
154            .arg("name")
155            .arg(name)
156            .arg("status")
157            .arg(JobStatus::Pending.to_string())
158            .arg("attempt")
159            .arg("0")
160            .arg("max_retries")
161            .arg(max_retries.to_string())
162            .arg("created_at")
163            .arg(&now)
164            .arg("updated_at")
165            .arg(&now)
166            .query_async::<()>(&mut *conn)
167            .await
168            .map_err(|e| ShaperailError::Internal(format!("Failed to store job metadata: {e}")))?;
169
170        // Set TTL on metadata (7 days)
171        let _: Result<(), _> = conn.expire(&meta_key, 604800).await;
172
173        // Push to priority queue
174        conn.rpush::<_, _, ()>(priority.queue_key(), &envelope_json)
175            .await
176            .map_err(|e| ShaperailError::Internal(format!("Failed to enqueue job: {e}")))?;
177
178        tracing::info!(job_id = %id, job_name = name, priority = ?priority, "Job enqueued");
179        Ok(id)
180    }
181
182    /// Retrieves the status/info of a job by ID.
183    pub async fn get_status(&self, job_id: &str) -> Result<JobInfo, ShaperailError> {
184        let mut conn = self
185            .pool
186            .get()
187            .await
188            .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
189
190        let meta_key = format!("shaperail:jobs:meta:{job_id}");
191        let values: Vec<String> = redis::cmd("HGETALL")
192            .arg(&meta_key)
193            .query_async(&mut *conn)
194            .await
195            .map_err(|e| ShaperailError::Internal(format!("Failed to get job status: {e}")))?;
196
197        if values.is_empty() {
198            return Err(ShaperailError::NotFound);
199        }
200
201        // Parse HGETALL flat key-value pairs
202        let mut map = std::collections::HashMap::new();
203        for chunk in values.chunks(2) {
204            if chunk.len() == 2 {
205                map.insert(chunk[0].clone(), chunk[1].clone());
206            }
207        }
208
209        let status = match map.get("status").map(|s| s.as_str()) {
210            Some("pending") => JobStatus::Pending,
211            Some("running") => JobStatus::Running,
212            Some("completed") => JobStatus::Completed,
213            Some("failed") => JobStatus::Failed,
214            _ => JobStatus::Pending,
215        };
216
217        Ok(JobInfo {
218            id: map.get("id").cloned().unwrap_or_default(),
219            name: map.get("name").cloned().unwrap_or_default(),
220            status,
221            attempt: map.get("attempt").and_then(|s| s.parse().ok()).unwrap_or(0),
222            max_retries: map
223                .get("max_retries")
224                .and_then(|s| s.parse().ok())
225                .unwrap_or(3),
226            error: map.get("error").cloned(),
227            created_at: map.get("created_at").cloned().unwrap_or_default(),
228            updated_at: map.get("updated_at").cloned().unwrap_or_default(),
229        })
230    }
231
232    /// Returns the total number of queued jobs across all priority levels.
233    pub async fn total_depth(&self) -> Result<i64, ShaperailError> {
234        let mut conn = self
235            .pool
236            .get()
237            .await
238            .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
239
240        let mut total = 0_i64;
241        for priority in JobPriority::all() {
242            let len: i64 = redis::cmd("LLEN")
243                .arg(priority.queue_key())
244                .query_async(&mut *conn)
245                .await
246                .map_err(|e| {
247                    ShaperailError::Internal(format!("Failed to inspect job queue depth: {e}"))
248                })?;
249            total += len;
250        }
251
252        Ok(total)
253    }
254
255    /// Updates a job's status in the metadata hash.
256    pub(crate) async fn update_status(
257        &self,
258        job_id: &str,
259        status: JobStatus,
260        attempt: u32,
261        error: Option<&str>,
262    ) -> Result<(), ShaperailError> {
263        let mut conn = self
264            .pool
265            .get()
266            .await
267            .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
268
269        let meta_key = format!("shaperail:jobs:meta:{job_id}");
270        let now = chrono::Utc::now().to_rfc3339();
271
272        let mut cmd = redis::cmd("HSET");
273        cmd.arg(&meta_key)
274            .arg("status")
275            .arg(status.to_string())
276            .arg("attempt")
277            .arg(attempt.to_string())
278            .arg("updated_at")
279            .arg(&now);
280
281        if let Some(err_msg) = error {
282            cmd.arg("error").arg(err_msg);
283        }
284
285        cmd.query_async::<()>(&mut *conn)
286            .await
287            .map_err(|e| ShaperailError::Internal(format!("Failed to update job status: {e}")))?;
288
289        Ok(())
290    }
291
292    /// Moves a job to the dead letter queue.
293    pub(crate) async fn move_to_dead_letter(
294        &self,
295        envelope: &JobEnvelope,
296        error: &str,
297    ) -> Result<(), ShaperailError> {
298        let mut conn = self
299            .pool
300            .get()
301            .await
302            .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
303
304        let dead_entry = serde_json::json!({
305            "id": envelope.id,
306            "name": envelope.name,
307            "payload": envelope.payload,
308            "error": error,
309            "attempts": envelope.attempt,
310            "failed_at": chrono::Utc::now().to_rfc3339(),
311        });
312
313        let dead_json = serde_json::to_string(&dead_entry).map_err(|e| {
314            ShaperailError::Internal(format!("Failed to serialize dead letter: {e}"))
315        })?;
316
317        conn.rpush::<_, _, ()>("shaperail:jobs:dead", &dead_json)
318            .await
319            .map_err(|e| {
320                ShaperailError::Internal(format!("Failed to push to dead letter queue: {e}"))
321            })?;
322
323        // Update status
324        self.update_status(
325            &envelope.id,
326            JobStatus::Failed,
327            envelope.attempt,
328            Some(error),
329        )
330        .await?;
331
332        tracing::warn!(
333            job_id = %envelope.id,
334            job_name = %envelope.name,
335            attempts = envelope.attempt,
336            "Job moved to dead letter queue"
337        );
338
339        Ok(())
340    }
341
342    /// Re-enqueues a job for retry with incremented attempt count.
343    pub(crate) async fn requeue_for_retry(
344        &self,
345        mut envelope: JobEnvelope,
346    ) -> Result<(), ShaperailError> {
347        envelope.attempt += 1;
348
349        let mut conn = self
350            .pool
351            .get()
352            .await
353            .map_err(|e| ShaperailError::Internal(format!("Redis connection failed: {e}")))?;
354
355        let envelope_json = serde_json::to_string(&envelope)
356            .map_err(|e| ShaperailError::Internal(format!("Failed to serialize job: {e}")))?;
357
358        // Push back to the same priority queue
359        conn.rpush::<_, _, ()>(envelope.priority.queue_key(), &envelope_json)
360            .await
361            .map_err(|e| ShaperailError::Internal(format!("Failed to requeue job: {e}")))?;
362
363        self.update_status(&envelope.id, JobStatus::Pending, envelope.attempt, None)
364            .await?;
365
366        Ok(())
367    }
368
369    /// Returns a reference to the underlying pool (used by Worker).
370    pub(crate) fn pool(&self) -> &Arc<deadpool_redis::Pool> {
371        &self.pool
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    #[test]
380    fn priority_queue_keys() {
381        assert_eq!(
382            JobPriority::Critical.queue_key(),
383            "shaperail:jobs:queue:critical"
384        );
385        assert_eq!(JobPriority::High.queue_key(), "shaperail:jobs:queue:high");
386        assert_eq!(
387            JobPriority::Normal.queue_key(),
388            "shaperail:jobs:queue:normal"
389        );
390        assert_eq!(JobPriority::Low.queue_key(), "shaperail:jobs:queue:low");
391    }
392
393    #[test]
394    fn priority_all_order() {
395        let all = JobPriority::all();
396        assert_eq!(all.len(), 4);
397        assert_eq!(all[0], JobPriority::Critical);
398        assert_eq!(all[1], JobPriority::High);
399        assert_eq!(all[2], JobPriority::Normal);
400        assert_eq!(all[3], JobPriority::Low);
401    }
402
403    #[test]
404    fn job_status_display() {
405        assert_eq!(JobStatus::Pending.to_string(), "pending");
406        assert_eq!(JobStatus::Running.to_string(), "running");
407        assert_eq!(JobStatus::Completed.to_string(), "completed");
408        assert_eq!(JobStatus::Failed.to_string(), "failed");
409    }
410
411    #[test]
412    fn job_envelope_serde_roundtrip() {
413        let envelope = JobEnvelope {
414            id: "test-id".to_string(),
415            name: "send_email".to_string(),
416            payload: serde_json::json!({"user_id": "123"}),
417            priority: JobPriority::Normal,
418            max_retries: 3,
419            timeout_secs: 300,
420            attempt: 0,
421        };
422        let json = serde_json::to_string(&envelope).unwrap();
423        let back: JobEnvelope = serde_json::from_str(&json).unwrap();
424        assert_eq!(back.id, "test-id");
425        assert_eq!(back.name, "send_email");
426        assert_eq!(back.priority, JobPriority::Normal);
427        assert_eq!(back.max_retries, 3);
428        assert_eq!(back.timeout_secs, 300);
429        assert_eq!(back.attempt, 0);
430    }
431
432    #[test]
433    fn job_priority_serde() {
434        let json = serde_json::to_string(&JobPriority::Critical).unwrap();
435        assert_eq!(json, "\"critical\"");
436        let back: JobPriority = serde_json::from_str(&json).unwrap();
437        assert_eq!(back, JobPriority::Critical);
438    }
439
440    #[test]
441    fn job_status_serde() {
442        let json = serde_json::to_string(&JobStatus::Completed).unwrap();
443        assert_eq!(json, "\"completed\"");
444        let back: JobStatus = serde_json::from_str(&json).unwrap();
445        assert_eq!(back, JobStatus::Completed);
446    }
447}