Skip to main content

forge_runtime/jobs/
queue.rs

1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5/// A job record in the database.
6#[derive(Debug, Clone)]
7pub struct JobRecord {
8    /// Unique job ID.
9    pub id: Uuid,
10    /// Job type/name.
11    pub job_type: String,
12    /// Job input as JSON.
13    pub input: serde_json::Value,
14    /// Job output as JSON (if completed).
15    pub output: Option<serde_json::Value>,
16    /// Persisted context data.
17    pub job_context: serde_json::Value,
18    /// Current status.
19    pub status: JobStatus,
20    /// Priority level.
21    pub priority: i32,
22    /// Number of attempts made.
23    pub attempts: i32,
24    /// Maximum attempts allowed.
25    pub max_attempts: i32,
26    /// Last error message.
27    pub last_error: Option<String>,
28    /// Required worker capability.
29    pub worker_capability: Option<String>,
30    /// Worker ID that claimed the job.
31    pub worker_id: Option<Uuid>,
32    /// Idempotency key for deduplication.
33    pub idempotency_key: Option<String>,
34    /// When the job is scheduled to run.
35    pub scheduled_at: DateTime<Utc>,
36    /// When the job was created.
37    pub created_at: DateTime<Utc>,
38    /// When the job was claimed.
39    pub claimed_at: Option<DateTime<Utc>>,
40    /// When the job started running.
41    pub started_at: Option<DateTime<Utc>>,
42    /// When the job completed.
43    pub completed_at: Option<DateTime<Utc>>,
44    /// When the job failed.
45    pub failed_at: Option<DateTime<Utc>>,
46    /// Last heartbeat time.
47    pub last_heartbeat: Option<DateTime<Utc>>,
48    /// Cancellation requested at.
49    pub cancel_requested_at: Option<DateTime<Utc>>,
50    /// Cancellation completed at.
51    pub cancelled_at: Option<DateTime<Utc>>,
52    /// Cancellation reason.
53    pub cancel_reason: Option<String>,
54}
55
56impl JobRecord {
57    /// Create a new job record.
58    pub fn new(
59        job_type: impl Into<String>,
60        input: serde_json::Value,
61        priority: JobPriority,
62        max_attempts: i32,
63    ) -> Self {
64        Self {
65            id: Uuid::new_v4(),
66            job_type: job_type.into(),
67            input,
68            output: None,
69            job_context: serde_json::json!({}),
70            status: JobStatus::Pending,
71            priority: priority.as_i32(),
72            attempts: 0,
73            max_attempts,
74            last_error: None,
75            worker_capability: None,
76            worker_id: None,
77            idempotency_key: None,
78            scheduled_at: Utc::now(),
79            created_at: Utc::now(),
80            claimed_at: None,
81            started_at: None,
82            completed_at: None,
83            failed_at: None,
84            last_heartbeat: None,
85            cancel_requested_at: None,
86            cancelled_at: None,
87            cancel_reason: None,
88        }
89    }
90
91    /// Set worker capability requirement.
92    pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
93        self.worker_capability = Some(capability.into());
94        self
95    }
96
97    /// Set scheduled time.
98    pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
99        self.scheduled_at = at;
100        self
101    }
102
103    /// Set idempotency key.
104    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
105        self.idempotency_key = Some(key.into());
106        self
107    }
108}
109
110/// Job queue operations.
111#[derive(Clone)]
112pub struct JobQueue {
113    pool: sqlx::PgPool,
114}
115
116impl JobQueue {
117    /// Create a new job queue.
118    pub fn new(pool: sqlx::PgPool) -> Self {
119        Self { pool }
120    }
121
122    /// Enqueue a new job.
123    pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
124        // Check for duplicate if idempotency key is set
125        if let Some(ref key) = job.idempotency_key {
126            let existing: Option<(Uuid,)> = sqlx::query_as(
127                r#"
128                SELECT id FROM forge_jobs
129                WHERE idempotency_key = $1
130                  AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
131                "#,
132            )
133            .bind(key)
134            .fetch_optional(&self.pool)
135            .await?;
136
137            if let Some((id,)) = existing {
138                return Ok(id); // Return existing job ID
139            }
140        }
141
142        sqlx::query(
143            r#"
144            INSERT INTO forge_jobs (
145                id, job_type, input, job_context, status, priority, attempts, max_attempts,
146                worker_capability, idempotency_key, scheduled_at, created_at
147            ) VALUES (
148                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
149            )
150            "#,
151        )
152        .bind(job.id)
153        .bind(&job.job_type)
154        .bind(&job.input)
155        .bind(&job.job_context)
156        .bind(job.status.as_str())
157        .bind(job.priority)
158        .bind(job.attempts)
159        .bind(job.max_attempts)
160        .bind(&job.worker_capability)
161        .bind(&job.idempotency_key)
162        .bind(job.scheduled_at)
163        .bind(job.created_at)
164        .execute(&self.pool)
165        .await?;
166
167        Ok(job.id)
168    }
169
170    /// Claim jobs using SKIP LOCKED pattern.
171    pub async fn claim(
172        &self,
173        worker_id: Uuid,
174        capabilities: &[String],
175        limit: i32,
176    ) -> Result<Vec<JobRecord>, sqlx::Error> {
177        let rows = sqlx::query(
178            r#"
179            WITH claimable AS (
180                SELECT id
181                FROM forge_jobs
182                WHERE status = 'pending'
183                  AND scheduled_at <= NOW()
184                  AND (worker_capability = ANY($2) OR worker_capability IS NULL)
185                ORDER BY priority DESC, scheduled_at ASC
186                LIMIT $3
187                FOR UPDATE SKIP LOCKED
188            )
189            UPDATE forge_jobs
190            SET
191                status = 'claimed',
192                worker_id = $1,
193                claimed_at = NOW(),
194                attempts = attempts + 1
195            WHERE id IN (SELECT id FROM claimable)
196            RETURNING
197                id, job_type, input, output, job_context, status, priority,
198                attempts, max_attempts, last_error, worker_capability,
199                worker_id, idempotency_key, scheduled_at, created_at,
200                claimed_at, started_at, completed_at, failed_at, last_heartbeat,
201                cancel_requested_at, cancelled_at, cancel_reason
202            "#,
203        )
204        .bind(worker_id)
205        .bind(capabilities)
206        .bind(limit)
207        .fetch_all(&self.pool)
208        .await?;
209
210        let jobs = rows
211            .iter()
212            .map(|row| {
213                use sqlx::Row;
214                JobRecord {
215                    id: row.get("id"),
216                    job_type: row.get("job_type"),
217                    input: row.get("input"),
218                    output: row.get("output"),
219                    job_context: row.get("job_context"),
220                    status: row.get::<String, _>("status").parse().unwrap(),
221                    priority: row.get("priority"),
222                    attempts: row.get("attempts"),
223                    max_attempts: row.get("max_attempts"),
224                    last_error: row.get("last_error"),
225                    worker_capability: row.get("worker_capability"),
226                    worker_id: row.get("worker_id"),
227                    idempotency_key: row.get("idempotency_key"),
228                    scheduled_at: row.get("scheduled_at"),
229                    created_at: row.get("created_at"),
230                    claimed_at: row.get("claimed_at"),
231                    started_at: row.get("started_at"),
232                    completed_at: row.get("completed_at"),
233                    failed_at: row.get("failed_at"),
234                    last_heartbeat: row.get("last_heartbeat"),
235                    cancel_requested_at: row.get("cancel_requested_at"),
236                    cancelled_at: row.get("cancelled_at"),
237                    cancel_reason: row.get("cancel_reason"),
238                }
239            })
240            .collect();
241
242        Ok(jobs)
243    }
244
245    /// Mark job as running.
246    pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
247        let result = sqlx::query(
248            r#"
249            UPDATE forge_jobs
250            SET status = 'running', started_at = NOW()
251            WHERE id = $1
252              AND status NOT IN ('cancel_requested', 'cancelled')
253            "#,
254        )
255        .bind(job_id)
256        .execute(&self.pool)
257        .await?;
258
259        if result.rows_affected() == 0 {
260            return Err(sqlx::Error::RowNotFound);
261        }
262
263        Ok(())
264    }
265
266    /// Mark job as completed.
267    ///
268    /// If `ttl` is provided, sets `expires_at` for automatic cleanup.
269    pub async fn complete(
270        &self,
271        job_id: Uuid,
272        output: serde_json::Value,
273        ttl: Option<std::time::Duration>,
274    ) -> Result<(), sqlx::Error> {
275        let expires_at = ttl.map(|d| {
276            chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
277        });
278
279        sqlx::query(
280            r#"
281            UPDATE forge_jobs
282            SET
283                status = 'completed',
284                output = $2,
285                completed_at = NOW(),
286                cancel_requested_at = NULL,
287                cancelled_at = NULL,
288                cancel_reason = NULL,
289                expires_at = $3
290            WHERE id = $1
291            "#,
292        )
293        .bind(job_id)
294        .bind(output)
295        .bind(expires_at)
296        .execute(&self.pool)
297        .await?;
298
299        Ok(())
300    }
301
302    /// Mark job as failed, schedule retry or move to dead letter.
303    ///
304    /// If `ttl` is provided and job moves to dead_letter, sets `expires_at` for automatic cleanup.
305    pub async fn fail(
306        &self,
307        job_id: Uuid,
308        error: &str,
309        retry_delay: Option<chrono::Duration>,
310        ttl: Option<std::time::Duration>,
311    ) -> Result<(), sqlx::Error> {
312        if let Some(delay) = retry_delay {
313            // Schedule retry
314            sqlx::query(
315                r#"
316                UPDATE forge_jobs
317                SET
318                    status = 'pending',
319                    worker_id = NULL,
320                    claimed_at = NULL,
321                    started_at = NULL,
322                    last_error = $2,
323                    scheduled_at = NOW() + $3,
324                    cancel_requested_at = NULL,
325                    cancelled_at = NULL,
326                    cancel_reason = NULL
327                WHERE id = $1
328                "#,
329            )
330            .bind(job_id)
331            .bind(error)
332            .bind(delay)
333            .execute(&self.pool)
334            .await?;
335        } else {
336            // Move to dead letter
337            let expires_at = ttl.map(|d| {
338                chrono::Utc::now()
339                    + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
340            });
341
342            sqlx::query(
343                r#"
344                UPDATE forge_jobs
345                SET
346                    status = 'dead_letter',
347                    last_error = $2,
348                    failed_at = NOW(),
349                    cancel_requested_at = NULL,
350                    cancelled_at = NULL,
351                    cancel_reason = NULL,
352                    expires_at = $3
353                WHERE id = $1
354                "#,
355            )
356            .bind(job_id)
357            .bind(error)
358            .bind(expires_at)
359            .execute(&self.pool)
360            .await?;
361        }
362
363        Ok(())
364    }
365
366    /// Update heartbeat for a running job.
367    pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
368        sqlx::query(
369            r#"
370            UPDATE forge_jobs
371            SET last_heartbeat = NOW()
372            WHERE id = $1
373            "#,
374        )
375        .bind(job_id)
376        .execute(&self.pool)
377        .await?;
378
379        Ok(())
380    }
381
382    /// Update job progress.
383    pub async fn update_progress(
384        &self,
385        job_id: Uuid,
386        percent: i32,
387        message: &str,
388    ) -> Result<(), sqlx::Error> {
389        sqlx::query(
390            r#"
391            UPDATE forge_jobs
392            SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
393            WHERE id = $1
394            "#,
395        )
396        .bind(job_id)
397        .bind(percent)
398        .bind(message)
399        .execute(&self.pool)
400        .await?;
401
402        Ok(())
403    }
404
405    /// Replace persisted job context.
406    pub async fn set_context(
407        &self,
408        job_id: Uuid,
409        context: serde_json::Value,
410    ) -> Result<(), sqlx::Error> {
411        sqlx::query(
412            r#"
413            UPDATE forge_jobs
414            SET job_context = $2
415            WHERE id = $1
416            "#,
417        )
418        .bind(job_id)
419        .bind(context)
420        .execute(&self.pool)
421        .await?;
422
423        Ok(())
424    }
425
426    /// Request cancellation for a job.
427    pub async fn request_cancel(
428        &self,
429        job_id: Uuid,
430        reason: Option<&str>,
431    ) -> Result<bool, sqlx::Error> {
432        let row: Option<(String,)> = sqlx::query_as(
433            r#"
434            SELECT status
435            FROM forge_jobs
436            WHERE id = $1
437            "#,
438        )
439        .bind(job_id)
440        .fetch_optional(&self.pool)
441        .await?;
442
443        let status = match row {
444            Some((status,)) => status,
445            None => return Ok(false),
446        };
447
448        let terminal_statuses = [
449            JobStatus::Completed.as_str(),
450            JobStatus::Failed.as_str(),
451            JobStatus::DeadLetter.as_str(),
452            JobStatus::Cancelled.as_str(),
453        ];
454
455        if status == JobStatus::Running.as_str() {
456            let updated = sqlx::query(
457                r#"
458                UPDATE forge_jobs
459                SET
460                    status = 'cancel_requested',
461                    cancel_requested_at = NOW(),
462                    cancel_reason = COALESCE($2, cancel_reason)
463                WHERE id = $1
464                  AND status = 'running'
465                "#,
466            )
467            .bind(job_id)
468            .bind(reason)
469            .execute(&self.pool)
470            .await?;
471
472            return Ok(updated.rows_affected() > 0);
473        }
474
475        if terminal_statuses.contains(&status.as_str()) {
476            return Ok(false);
477        }
478
479        let updated = sqlx::query(
480            r#"
481            UPDATE forge_jobs
482            SET
483                status = 'cancelled',
484                cancelled_at = NOW(),
485                cancel_reason = COALESCE($2, cancel_reason)
486            WHERE id = $1
487              AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
488            "#,
489        )
490        .bind(job_id)
491        .bind(reason)
492        .execute(&self.pool)
493        .await?;
494
495        Ok(updated.rows_affected() > 0)
496    }
497
498    /// Mark job as cancelled.
499    ///
500    /// If `ttl` is provided, sets `expires_at` for automatic cleanup.
501    pub async fn cancel(
502        &self,
503        job_id: Uuid,
504        reason: Option<&str>,
505        ttl: Option<std::time::Duration>,
506    ) -> Result<(), sqlx::Error> {
507        let expires_at = ttl.map(|d| {
508            chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
509        });
510
511        sqlx::query(
512            r#"
513            UPDATE forge_jobs
514            SET
515                status = 'cancelled',
516                cancelled_at = NOW(),
517                cancel_reason = COALESCE($2, cancel_reason),
518                expires_at = $3
519            WHERE id = $1
520            "#,
521        )
522        .bind(job_id)
523        .bind(reason)
524        .bind(expires_at)
525        .execute(&self.pool)
526        .await?;
527
528        Ok(())
529    }
530
531    /// Release stale jobs back to pending.
532    pub async fn release_stale(
533        &self,
534        stale_threshold: chrono::Duration,
535    ) -> Result<u64, sqlx::Error> {
536        let result = sqlx::query(
537            r#"
538            UPDATE forge_jobs
539            SET
540                status = 'pending',
541                worker_id = NULL,
542                claimed_at = NULL
543            WHERE status IN ('claimed', 'running')
544              AND claimed_at < NOW() - $1
545            "#,
546        )
547        .bind(stale_threshold)
548        .execute(&self.pool)
549        .await?;
550
551        Ok(result.rows_affected())
552    }
553
554    /// Delete expired job records.
555    ///
556    /// Only deletes terminal jobs (completed, cancelled, failed, dead_letter)
557    /// that have passed their TTL.
558    pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
559        let result = sqlx::query(
560            r#"
561            DELETE FROM forge_jobs
562            WHERE expires_at IS NOT NULL
563              AND expires_at < NOW()
564              AND status IN ('completed', 'cancelled', 'failed', 'dead_letter')
565            "#,
566        )
567        .execute(&self.pool)
568        .await?;
569
570        Ok(result.rows_affected())
571    }
572
573    /// Get queue statistics.
574    pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
575        let row = sqlx::query(
576            r#"
577            SELECT
578                COUNT(*) FILTER (WHERE status = 'pending') as pending,
579                COUNT(*) FILTER (WHERE status = 'claimed') as claimed,
580                COUNT(*) FILTER (WHERE status = 'running') as running,
581                COUNT(*) FILTER (WHERE status = 'completed') as completed,
582                COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled,
583                COUNT(*) FILTER (WHERE status = 'failed') as failed,
584                COUNT(*) FILTER (WHERE status = 'dead_letter') as dead_letter
585            FROM forge_jobs
586            "#,
587        )
588        .fetch_one(&self.pool)
589        .await?;
590
591        use sqlx::Row;
592        Ok(QueueStats {
593            pending: row.get::<i64, _>("pending") as u64,
594            claimed: row.get::<i64, _>("claimed") as u64,
595            running: row.get::<i64, _>("running") as u64,
596            completed: row.get::<i64, _>("completed") as u64,
597            cancelled: row.get::<i64, _>("cancelled") as u64,
598            failed: row.get::<i64, _>("failed") as u64,
599            dead_letter: row.get::<i64, _>("dead_letter") as u64,
600        })
601    }
602}
603
604/// Queue statistics.
605#[derive(Debug, Clone, Default)]
606pub struct QueueStats {
607    pub pending: u64,
608    pub claimed: u64,
609    pub running: u64,
610    pub completed: u64,
611    pub cancelled: u64,
612    pub failed: u64,
613    pub dead_letter: u64,
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619
620    #[test]
621    fn test_job_record_creation() {
622        let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
623
624        assert_eq!(job.job_type, "send_email");
625        assert_eq!(job.status, JobStatus::Pending);
626        assert_eq!(job.priority, 50);
627        assert_eq!(job.attempts, 0);
628        assert_eq!(job.max_attempts, 3);
629    }
630
631    #[test]
632    fn test_job_record_with_capability() {
633        let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
634            .with_capability("media");
635
636        assert_eq!(job.worker_capability, Some("media".to_string()));
637        assert_eq!(job.priority, 75);
638    }
639
640    #[test]
641    fn test_job_record_with_idempotency() {
642        let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
643            .with_idempotency_key("payment-123");
644
645        assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
646    }
647}