Skip to main content

forge_runtime/jobs/
queue.rs

1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5/// A job record in the database.
6#[derive(Debug, Clone)]
7pub struct JobRecord {
8    /// Unique job ID.
9    pub id: Uuid,
10    /// Job type/name.
11    pub job_type: String,
12    /// Job input as JSON.
13    pub input: serde_json::Value,
14    /// Job output as JSON (if completed).
15    pub output: Option<serde_json::Value>,
16    /// Current status.
17    pub status: JobStatus,
18    /// Priority level.
19    pub priority: i32,
20    /// Number of attempts made.
21    pub attempts: i32,
22    /// Maximum attempts allowed.
23    pub max_attempts: i32,
24    /// Last error message.
25    pub last_error: Option<String>,
26    /// Required worker capability.
27    pub worker_capability: Option<String>,
28    /// Worker ID that claimed the job.
29    pub worker_id: Option<Uuid>,
30    /// Idempotency key for deduplication.
31    pub idempotency_key: Option<String>,
32    /// When the job is scheduled to run.
33    pub scheduled_at: DateTime<Utc>,
34    /// When the job was created.
35    pub created_at: DateTime<Utc>,
36    /// When the job was claimed.
37    pub claimed_at: Option<DateTime<Utc>>,
38    /// When the job started running.
39    pub started_at: Option<DateTime<Utc>>,
40    /// When the job completed.
41    pub completed_at: Option<DateTime<Utc>>,
42    /// When the job failed.
43    pub failed_at: Option<DateTime<Utc>>,
44    /// Last heartbeat time.
45    pub last_heartbeat: Option<DateTime<Utc>>,
46}
47
48impl JobRecord {
49    /// Create a new job record.
50    pub fn new(
51        job_type: impl Into<String>,
52        input: serde_json::Value,
53        priority: JobPriority,
54        max_attempts: i32,
55    ) -> Self {
56        Self {
57            id: Uuid::new_v4(),
58            job_type: job_type.into(),
59            input,
60            output: None,
61            status: JobStatus::Pending,
62            priority: priority.as_i32(),
63            attempts: 0,
64            max_attempts,
65            last_error: None,
66            worker_capability: None,
67            worker_id: None,
68            idempotency_key: None,
69            scheduled_at: Utc::now(),
70            created_at: Utc::now(),
71            claimed_at: None,
72            started_at: None,
73            completed_at: None,
74            failed_at: None,
75            last_heartbeat: None,
76        }
77    }
78
79    /// Set worker capability requirement.
80    pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
81        self.worker_capability = Some(capability.into());
82        self
83    }
84
85    /// Set scheduled time.
86    pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
87        self.scheduled_at = at;
88        self
89    }
90
91    /// Set idempotency key.
92    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
93        self.idempotency_key = Some(key.into());
94        self
95    }
96}
97
98/// Job queue operations.
99#[derive(Clone)]
100pub struct JobQueue {
101    pool: sqlx::PgPool,
102}
103
104impl JobQueue {
105    /// Create a new job queue.
106    pub fn new(pool: sqlx::PgPool) -> Self {
107        Self { pool }
108    }
109
110    /// Enqueue a new job.
111    pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
112        // Check for duplicate if idempotency key is set
113        if let Some(ref key) = job.idempotency_key {
114            let existing: Option<(Uuid,)> = sqlx::query_as(
115                r#"
116                SELECT id FROM forge_jobs
117                WHERE idempotency_key = $1
118                  AND status NOT IN ('completed', 'failed', 'dead_letter')
119                "#,
120            )
121            .bind(key)
122            .fetch_optional(&self.pool)
123            .await?;
124
125            if let Some((id,)) = existing {
126                return Ok(id); // Return existing job ID
127            }
128        }
129
130        sqlx::query(
131            r#"
132            INSERT INTO forge_jobs (
133                id, job_type, input, status, priority, attempts, max_attempts,
134                worker_capability, idempotency_key, scheduled_at, created_at
135            ) VALUES (
136                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
137            )
138            "#,
139        )
140        .bind(job.id)
141        .bind(&job.job_type)
142        .bind(&job.input)
143        .bind(job.status.as_str())
144        .bind(job.priority)
145        .bind(job.attempts)
146        .bind(job.max_attempts)
147        .bind(&job.worker_capability)
148        .bind(&job.idempotency_key)
149        .bind(job.scheduled_at)
150        .bind(job.created_at)
151        .execute(&self.pool)
152        .await?;
153
154        Ok(job.id)
155    }
156
157    /// Claim jobs using SKIP LOCKED pattern.
158    pub async fn claim(
159        &self,
160        worker_id: Uuid,
161        capabilities: &[String],
162        limit: i32,
163    ) -> Result<Vec<JobRecord>, sqlx::Error> {
164        let rows = sqlx::query(
165            r#"
166            WITH claimable AS (
167                SELECT id
168                FROM forge_jobs
169                WHERE status = 'pending'
170                  AND scheduled_at <= NOW()
171                  AND (worker_capability = ANY($2) OR worker_capability IS NULL)
172                ORDER BY priority DESC, scheduled_at ASC
173                LIMIT $3
174                FOR UPDATE SKIP LOCKED
175            )
176            UPDATE forge_jobs
177            SET
178                status = 'claimed',
179                worker_id = $1,
180                claimed_at = NOW(),
181                attempts = attempts + 1
182            WHERE id IN (SELECT id FROM claimable)
183            RETURNING
184                id, job_type, input, output, status, priority,
185                attempts, max_attempts, last_error, worker_capability,
186                worker_id, idempotency_key, scheduled_at, created_at,
187                claimed_at, started_at, completed_at, failed_at, last_heartbeat
188            "#,
189        )
190        .bind(worker_id)
191        .bind(capabilities)
192        .bind(limit)
193        .fetch_all(&self.pool)
194        .await?;
195
196        let jobs = rows
197            .iter()
198            .map(|row| {
199                use sqlx::Row;
200                JobRecord {
201                    id: row.get("id"),
202                    job_type: row.get("job_type"),
203                    input: row.get("input"),
204                    output: row.get("output"),
205                    status: row.get::<String, _>("status").parse().unwrap(),
206                    priority: row.get("priority"),
207                    attempts: row.get("attempts"),
208                    max_attempts: row.get("max_attempts"),
209                    last_error: row.get("last_error"),
210                    worker_capability: row.get("worker_capability"),
211                    worker_id: row.get("worker_id"),
212                    idempotency_key: row.get("idempotency_key"),
213                    scheduled_at: row.get("scheduled_at"),
214                    created_at: row.get("created_at"),
215                    claimed_at: row.get("claimed_at"),
216                    started_at: row.get("started_at"),
217                    completed_at: row.get("completed_at"),
218                    failed_at: row.get("failed_at"),
219                    last_heartbeat: row.get("last_heartbeat"),
220                }
221            })
222            .collect();
223
224        Ok(jobs)
225    }
226
227    /// Mark job as running.
228    pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
229        sqlx::query(
230            r#"
231            UPDATE forge_jobs
232            SET status = 'running', started_at = NOW()
233            WHERE id = $1
234            "#,
235        )
236        .bind(job_id)
237        .execute(&self.pool)
238        .await?;
239
240        Ok(())
241    }
242
243    /// Mark job as completed.
244    pub async fn complete(
245        &self,
246        job_id: Uuid,
247        output: serde_json::Value,
248    ) -> Result<(), sqlx::Error> {
249        sqlx::query(
250            r#"
251            UPDATE forge_jobs
252            SET
253                status = 'completed',
254                output = $2,
255                completed_at = NOW()
256            WHERE id = $1
257            "#,
258        )
259        .bind(job_id)
260        .bind(output)
261        .execute(&self.pool)
262        .await?;
263
264        Ok(())
265    }
266
267    /// Mark job as failed, schedule retry or move to dead letter.
268    pub async fn fail(
269        &self,
270        job_id: Uuid,
271        error: &str,
272        retry_delay: Option<chrono::Duration>,
273    ) -> Result<(), sqlx::Error> {
274        if let Some(delay) = retry_delay {
275            // Schedule retry
276            sqlx::query(
277                r#"
278                UPDATE forge_jobs
279                SET
280                    status = 'pending',
281                    worker_id = NULL,
282                    claimed_at = NULL,
283                    started_at = NULL,
284                    last_error = $2,
285                    scheduled_at = NOW() + $3
286                WHERE id = $1
287                "#,
288            )
289            .bind(job_id)
290            .bind(error)
291            .bind(delay)
292            .execute(&self.pool)
293            .await?;
294        } else {
295            // Move to dead letter
296            sqlx::query(
297                r#"
298                UPDATE forge_jobs
299                SET
300                    status = 'dead_letter',
301                    last_error = $2,
302                    failed_at = NOW()
303                WHERE id = $1
304                "#,
305            )
306            .bind(job_id)
307            .bind(error)
308            .execute(&self.pool)
309            .await?;
310        }
311
312        Ok(())
313    }
314
315    /// Update heartbeat for a running job.
316    pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
317        sqlx::query(
318            r#"
319            UPDATE forge_jobs
320            SET last_heartbeat = NOW()
321            WHERE id = $1
322            "#,
323        )
324        .bind(job_id)
325        .execute(&self.pool)
326        .await?;
327
328        Ok(())
329    }
330
331    /// Update job progress.
332    pub async fn update_progress(
333        &self,
334        job_id: Uuid,
335        percent: i32,
336        message: &str,
337    ) -> Result<(), sqlx::Error> {
338        sqlx::query(
339            r#"
340            UPDATE forge_jobs
341            SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
342            WHERE id = $1
343            "#,
344        )
345        .bind(job_id)
346        .bind(percent)
347        .bind(message)
348        .execute(&self.pool)
349        .await?;
350
351        Ok(())
352    }
353
354    /// Release stale jobs back to pending.
355    pub async fn release_stale(
356        &self,
357        stale_threshold: chrono::Duration,
358    ) -> Result<u64, sqlx::Error> {
359        let result = sqlx::query(
360            r#"
361            UPDATE forge_jobs
362            SET
363                status = 'pending',
364                worker_id = NULL,
365                claimed_at = NULL
366            WHERE status IN ('claimed', 'running')
367              AND claimed_at < NOW() - $1
368            "#,
369        )
370        .bind(stale_threshold)
371        .execute(&self.pool)
372        .await?;
373
374        Ok(result.rows_affected())
375    }
376
377    /// Get queue statistics.
378    pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
379        let row = sqlx::query(
380            r#"
381            SELECT
382                COUNT(*) FILTER (WHERE status = 'pending') as pending,
383                COUNT(*) FILTER (WHERE status = 'claimed') as claimed,
384                COUNT(*) FILTER (WHERE status = 'running') as running,
385                COUNT(*) FILTER (WHERE status = 'completed') as completed,
386                COUNT(*) FILTER (WHERE status = 'failed') as failed,
387                COUNT(*) FILTER (WHERE status = 'dead_letter') as dead_letter
388            FROM forge_jobs
389            "#,
390        )
391        .fetch_one(&self.pool)
392        .await?;
393
394        use sqlx::Row;
395        Ok(QueueStats {
396            pending: row.get::<i64, _>("pending") as u64,
397            claimed: row.get::<i64, _>("claimed") as u64,
398            running: row.get::<i64, _>("running") as u64,
399            completed: row.get::<i64, _>("completed") as u64,
400            failed: row.get::<i64, _>("failed") as u64,
401            dead_letter: row.get::<i64, _>("dead_letter") as u64,
402        })
403    }
404}
405
406/// Queue statistics.
407#[derive(Debug, Clone, Default)]
408pub struct QueueStats {
409    pub pending: u64,
410    pub claimed: u64,
411    pub running: u64,
412    pub completed: u64,
413    pub failed: u64,
414    pub dead_letter: u64,
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[test]
422    fn test_job_record_creation() {
423        let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
424
425        assert_eq!(job.job_type, "send_email");
426        assert_eq!(job.status, JobStatus::Pending);
427        assert_eq!(job.priority, 50);
428        assert_eq!(job.attempts, 0);
429        assert_eq!(job.max_attempts, 3);
430    }
431
432    #[test]
433    fn test_job_record_with_capability() {
434        let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
435            .with_capability("media");
436
437        assert_eq!(job.worker_capability, Some("media".to_string()));
438        assert_eq!(job.priority, 75);
439    }
440
441    #[test]
442    fn test_job_record_with_idempotency() {
443        let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
444            .with_idempotency_key("payment-123");
445
446        assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
447    }
448}