Skip to main content

forge_runtime/jobs/
queue.rs

1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5/// A job record in the database.
6#[derive(Debug, Clone)]
7pub struct JobRecord {
8    /// Unique job ID.
9    pub id: Uuid,
10    /// Job type/name.
11    pub job_type: String,
12    /// Job input as JSON.
13    pub input: serde_json::Value,
14    /// Job output as JSON (if completed).
15    pub output: Option<serde_json::Value>,
16    /// Persisted context data.
17    pub job_context: serde_json::Value,
18    /// Current status.
19    pub status: JobStatus,
20    /// Priority level.
21    pub priority: i32,
22    /// Number of attempts made.
23    pub attempts: i32,
24    /// Maximum attempts allowed.
25    pub max_attempts: i32,
26    /// Last error message.
27    pub last_error: Option<String>,
28    /// Required worker capability.
29    pub worker_capability: Option<String>,
30    /// Worker ID that claimed the job.
31    pub worker_id: Option<Uuid>,
32    /// Idempotency key for deduplication.
33    pub idempotency_key: Option<String>,
34    /// Principal that created the job (for access control).
35    pub owner_subject: Option<String>,
36    /// When the job is scheduled to run.
37    pub scheduled_at: DateTime<Utc>,
38    /// When the job was created.
39    pub created_at: DateTime<Utc>,
40    /// When the job was claimed.
41    pub claimed_at: Option<DateTime<Utc>>,
42    /// When the job started running.
43    pub started_at: Option<DateTime<Utc>>,
44    /// When the job completed.
45    pub completed_at: Option<DateTime<Utc>>,
46    /// When the job failed.
47    pub failed_at: Option<DateTime<Utc>>,
48    /// Last heartbeat time.
49    pub last_heartbeat: Option<DateTime<Utc>>,
50    /// Cancellation requested at.
51    pub cancel_requested_at: Option<DateTime<Utc>>,
52    /// Cancellation completed at.
53    pub cancelled_at: Option<DateTime<Utc>>,
54    /// Cancellation reason.
55    pub cancel_reason: Option<String>,
56}
57
58impl JobRecord {
59    /// Create a new job record.
60    pub fn new(
61        job_type: impl Into<String>,
62        input: serde_json::Value,
63        priority: JobPriority,
64        max_attempts: i32,
65    ) -> Self {
66        Self {
67            id: Uuid::new_v4(),
68            job_type: job_type.into(),
69            input,
70            output: None,
71            job_context: serde_json::json!({}),
72            status: JobStatus::Pending,
73            priority: priority.as_i32(),
74            attempts: 0,
75            max_attempts,
76            last_error: None,
77            worker_capability: None,
78            worker_id: None,
79            idempotency_key: None,
80            owner_subject: None,
81            scheduled_at: Utc::now(),
82            created_at: Utc::now(),
83            claimed_at: None,
84            started_at: None,
85            completed_at: None,
86            failed_at: None,
87            last_heartbeat: None,
88            cancel_requested_at: None,
89            cancelled_at: None,
90            cancel_reason: None,
91        }
92    }
93
94    /// Set worker capability requirement.
95    pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
96        self.worker_capability = Some(capability.into());
97        self
98    }
99
100    /// Set scheduled time.
101    pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
102        self.scheduled_at = at;
103        self
104    }
105
106    /// Set idempotency key.
107    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
108        self.idempotency_key = Some(key.into());
109        self
110    }
111
112    /// Set owner subject.
113    pub fn with_owner_subject(mut self, owner_subject: Option<String>) -> Self {
114        self.owner_subject = owner_subject;
115        self
116    }
117}
118
119/// Job queue operations.
120#[derive(Clone)]
121pub struct JobQueue {
122    pool: sqlx::PgPool,
123}
124
125impl JobQueue {
126    /// Create a new job queue.
127    pub fn new(pool: sqlx::PgPool) -> Self {
128        Self { pool }
129    }
130
131    /// Enqueue a new job.
132    pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
133        // Check for duplicate if idempotency key is set
134        if let Some(ref key) = job.idempotency_key {
135            let existing: Option<(Uuid,)> = sqlx::query_as(
136                r#"
137                SELECT id FROM forge_jobs
138                WHERE idempotency_key = $1
139                  AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
140                "#,
141            )
142            .bind(key)
143            .fetch_optional(&self.pool)
144            .await?;
145
146            if let Some((id,)) = existing {
147                return Ok(id); // Return existing job ID
148            }
149        }
150
151        sqlx::query(
152            r#"
153            INSERT INTO forge_jobs (
154                id, job_type, input, job_context, status, priority, attempts, max_attempts,
155                worker_capability, idempotency_key, owner_subject, scheduled_at, created_at
156            ) VALUES (
157                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
158            )
159            "#,
160        )
161        .bind(job.id)
162        .bind(&job.job_type)
163        .bind(&job.input)
164        .bind(&job.job_context)
165        .bind(job.status.as_str())
166        .bind(job.priority)
167        .bind(job.attempts)
168        .bind(job.max_attempts)
169        .bind(&job.worker_capability)
170        .bind(&job.idempotency_key)
171        .bind(&job.owner_subject)
172        .bind(job.scheduled_at)
173        .bind(job.created_at)
174        .execute(&self.pool)
175        .await?;
176
177        Ok(job.id)
178    }
179
180    /// Claim jobs using SKIP LOCKED pattern.
181    pub async fn claim(
182        &self,
183        worker_id: Uuid,
184        capabilities: &[String],
185        limit: i32,
186    ) -> Result<Vec<JobRecord>, sqlx::Error> {
187        let rows = sqlx::query(
188            r#"
189            WITH claimable AS (
190                SELECT id
191                FROM forge_jobs
192                WHERE status = 'pending'
193                  AND scheduled_at <= NOW()
194                  AND (worker_capability = ANY($2) OR worker_capability IS NULL)
195                ORDER BY priority DESC, scheduled_at ASC
196                LIMIT $3
197                FOR UPDATE SKIP LOCKED
198            )
199            UPDATE forge_jobs
200            SET
201                status = 'claimed',
202                worker_id = $1,
203                claimed_at = NOW(),
204                attempts = attempts + 1
205            WHERE id IN (SELECT id FROM claimable)
206            RETURNING
207                id, job_type, input, output, job_context, status, priority,
208                attempts, max_attempts, last_error, worker_capability,
209                worker_id, idempotency_key, owner_subject, scheduled_at, created_at,
210                claimed_at, started_at, completed_at, failed_at, last_heartbeat,
211                cancel_requested_at, cancelled_at, cancel_reason
212            "#,
213        )
214        .bind(worker_id)
215        .bind(capabilities)
216        .bind(limit)
217        .fetch_all(&self.pool)
218        .await?;
219
220        let jobs = rows
221            .iter()
222            .map(|row| {
223                use sqlx::Row;
224                JobRecord {
225                    id: row.get("id"),
226                    job_type: row.get("job_type"),
227                    input: row.get("input"),
228                    output: row.get("output"),
229                    job_context: row.get("job_context"),
230                    status: row
231                        .get::<String, _>("status")
232                        .parse()
233                        .expect("valid job status from database"),
234                    priority: row.get("priority"),
235                    attempts: row.get("attempts"),
236                    max_attempts: row.get("max_attempts"),
237                    last_error: row.get("last_error"),
238                    worker_capability: row.get("worker_capability"),
239                    worker_id: row.get("worker_id"),
240                    idempotency_key: row.get("idempotency_key"),
241                    owner_subject: row.get("owner_subject"),
242                    scheduled_at: row.get("scheduled_at"),
243                    created_at: row.get("created_at"),
244                    claimed_at: row.get("claimed_at"),
245                    started_at: row.get("started_at"),
246                    completed_at: row.get("completed_at"),
247                    failed_at: row.get("failed_at"),
248                    last_heartbeat: row.get("last_heartbeat"),
249                    cancel_requested_at: row.get("cancel_requested_at"),
250                    cancelled_at: row.get("cancelled_at"),
251                    cancel_reason: row.get("cancel_reason"),
252                }
253            })
254            .collect();
255
256        Ok(jobs)
257    }
258
259    /// Mark job as running.
260    pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
261        let result = sqlx::query(
262            r#"
263            UPDATE forge_jobs
264            SET status = 'running', started_at = NOW(), last_heartbeat = NOW()
265            WHERE id = $1
266              AND status NOT IN ('cancel_requested', 'cancelled')
267            "#,
268        )
269        .bind(job_id)
270        .execute(&self.pool)
271        .await?;
272
273        if result.rows_affected() == 0 {
274            return Err(sqlx::Error::RowNotFound);
275        }
276
277        Ok(())
278    }
279
280    /// Mark job as completed.
281    ///
282    /// If `ttl` is provided, sets `expires_at` for automatic cleanup.
283    pub async fn complete(
284        &self,
285        job_id: Uuid,
286        output: serde_json::Value,
287        ttl: Option<std::time::Duration>,
288    ) -> Result<(), sqlx::Error> {
289        let expires_at = ttl.map(|d| {
290            chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
291        });
292
293        sqlx::query(
294            r#"
295            UPDATE forge_jobs
296            SET
297                status = 'completed',
298                output = $2,
299                completed_at = NOW(),
300                cancel_requested_at = NULL,
301                cancelled_at = NULL,
302                cancel_reason = NULL,
303                expires_at = $3
304            WHERE id = $1
305            "#,
306        )
307        .bind(job_id)
308        .bind(output)
309        .bind(expires_at)
310        .execute(&self.pool)
311        .await?;
312
313        Ok(())
314    }
315
316    /// Mark job as failed, schedule retry or move to dead letter.
317    ///
318    /// If `ttl` is provided and job moves to dead_letter, sets `expires_at` for automatic cleanup.
319    pub async fn fail(
320        &self,
321        job_id: Uuid,
322        error: &str,
323        retry_delay: Option<chrono::Duration>,
324        ttl: Option<std::time::Duration>,
325    ) -> Result<(), sqlx::Error> {
326        if let Some(delay) = retry_delay {
327            // Schedule retry
328            sqlx::query(
329                r#"
330                UPDATE forge_jobs
331                SET
332                    status = 'pending',
333                    worker_id = NULL,
334                    claimed_at = NULL,
335                    started_at = NULL,
336                    last_error = $2,
337                    scheduled_at = NOW() + $3,
338                    cancel_requested_at = NULL,
339                    cancelled_at = NULL,
340                    cancel_reason = NULL
341                WHERE id = $1
342                "#,
343            )
344            .bind(job_id)
345            .bind(error)
346            .bind(delay)
347            .execute(&self.pool)
348            .await?;
349        } else {
350            // Move to dead letter
351            let expires_at = ttl.map(|d| {
352                chrono::Utc::now()
353                    + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
354            });
355
356            sqlx::query(
357                r#"
358                UPDATE forge_jobs
359                SET
360                    status = 'dead_letter',
361                    last_error = $2,
362                    failed_at = NOW(),
363                    cancel_requested_at = NULL,
364                    cancelled_at = NULL,
365                    cancel_reason = NULL,
366                    expires_at = $3
367                WHERE id = $1
368                "#,
369            )
370            .bind(job_id)
371            .bind(error)
372            .bind(expires_at)
373            .execute(&self.pool)
374            .await?;
375        }
376
377        Ok(())
378    }
379
380    /// Update heartbeat for a running job.
381    pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
382        sqlx::query(
383            r#"
384            UPDATE forge_jobs
385            SET last_heartbeat = NOW()
386            WHERE id = $1
387            "#,
388        )
389        .bind(job_id)
390        .execute(&self.pool)
391        .await?;
392
393        Ok(())
394    }
395
396    /// Update job progress.
397    pub async fn update_progress(
398        &self,
399        job_id: Uuid,
400        percent: i32,
401        message: &str,
402    ) -> Result<(), sqlx::Error> {
403        sqlx::query(
404            r#"
405            UPDATE forge_jobs
406            SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
407            WHERE id = $1
408            "#,
409        )
410        .bind(job_id)
411        .bind(percent)
412        .bind(message)
413        .execute(&self.pool)
414        .await?;
415
416        Ok(())
417    }
418
419    /// Replace persisted job context.
420    pub async fn set_context(
421        &self,
422        job_id: Uuid,
423        context: serde_json::Value,
424    ) -> Result<(), sqlx::Error> {
425        sqlx::query(
426            r#"
427            UPDATE forge_jobs
428            SET job_context = $2
429            WHERE id = $1
430            "#,
431        )
432        .bind(job_id)
433        .bind(context)
434        .execute(&self.pool)
435        .await?;
436
437        Ok(())
438    }
439
440    /// Request cancellation for a job.
441    pub async fn request_cancel(
442        &self,
443        job_id: Uuid,
444        reason: Option<&str>,
445    ) -> Result<bool, sqlx::Error> {
446        let row: Option<(String,)> = sqlx::query_as(
447            r#"
448            SELECT status
449            FROM forge_jobs
450            WHERE id = $1
451            "#,
452        )
453        .bind(job_id)
454        .fetch_optional(&self.pool)
455        .await?;
456
457        let status = match row {
458            Some((status,)) => status,
459            None => return Ok(false),
460        };
461
462        let terminal_statuses = [
463            JobStatus::Completed.as_str(),
464            JobStatus::Failed.as_str(),
465            JobStatus::DeadLetter.as_str(),
466            JobStatus::Cancelled.as_str(),
467        ];
468
469        if status == JobStatus::Running.as_str() {
470            let updated = sqlx::query(
471                r#"
472                UPDATE forge_jobs
473                SET
474                    status = 'cancel_requested',
475                    cancel_requested_at = NOW(),
476                    cancel_reason = COALESCE($2, cancel_reason)
477                WHERE id = $1
478                  AND status = 'running'
479                "#,
480            )
481            .bind(job_id)
482            .bind(reason)
483            .execute(&self.pool)
484            .await?;
485
486            return Ok(updated.rows_affected() > 0);
487        }
488
489        if terminal_statuses.contains(&status.as_str()) {
490            return Ok(false);
491        }
492
493        let updated = sqlx::query(
494            r#"
495            UPDATE forge_jobs
496            SET
497                status = 'cancelled',
498                cancelled_at = NOW(),
499                cancel_reason = COALESCE($2, cancel_reason)
500            WHERE id = $1
501              AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
502            "#,
503        )
504        .bind(job_id)
505        .bind(reason)
506        .execute(&self.pool)
507        .await?;
508
509        Ok(updated.rows_affected() > 0)
510    }
511
512    /// Mark job as cancelled.
513    ///
514    /// If `ttl` is provided, sets `expires_at` for automatic cleanup.
515    pub async fn cancel(
516        &self,
517        job_id: Uuid,
518        reason: Option<&str>,
519        ttl: Option<std::time::Duration>,
520    ) -> Result<(), sqlx::Error> {
521        let expires_at = ttl.map(|d| {
522            chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
523        });
524
525        sqlx::query(
526            r#"
527            UPDATE forge_jobs
528            SET
529                status = 'cancelled',
530                cancelled_at = NOW(),
531                cancel_reason = COALESCE($2, cancel_reason),
532                expires_at = $3
533            WHERE id = $1
534            "#,
535        )
536        .bind(job_id)
537        .bind(reason)
538        .bind(expires_at)
539        .execute(&self.pool)
540        .await?;
541
542        Ok(())
543    }
544
545    /// Release stale jobs back to pending.
546    pub async fn release_stale(
547        &self,
548        stale_threshold: chrono::Duration,
549    ) -> Result<u64, sqlx::Error> {
550        let result = sqlx::query(
551            r#"
552            UPDATE forge_jobs
553            SET
554                status = 'pending',
555                worker_id = NULL,
556                claimed_at = NULL,
557                started_at = NULL,
558                last_heartbeat = NULL
559            WHERE
560                (
561                    status = 'claimed'
562                    AND claimed_at < NOW() - $1
563                )
564                OR (
565                    status = 'running'
566                    AND COALESCE(last_heartbeat, started_at, claimed_at) < NOW() - $1
567                )
568            "#,
569        )
570        .bind(stale_threshold)
571        .execute(&self.pool)
572        .await?;
573
574        Ok(result.rows_affected())
575    }
576
577    /// Delete expired job records.
578    ///
579    /// Only deletes terminal jobs (completed, cancelled, failed, dead_letter)
580    /// that have passed their TTL.
581    pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
582        let result = sqlx::query(
583            r#"
584            DELETE FROM forge_jobs
585            WHERE expires_at IS NOT NULL
586              AND expires_at < NOW()
587              AND status IN ('completed', 'cancelled', 'failed', 'dead_letter')
588            "#,
589        )
590        .execute(&self.pool)
591        .await?;
592
593        Ok(result.rows_affected())
594    }
595
596    /// Get queue statistics.
597    pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
598        let row = sqlx::query(
599            r#"
600            SELECT
601                COUNT(*) FILTER (WHERE status = 'pending') as pending,
602                COUNT(*) FILTER (WHERE status = 'claimed') as claimed,
603                COUNT(*) FILTER (WHERE status = 'running') as running,
604                COUNT(*) FILTER (WHERE status = 'completed') as completed,
605                COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled,
606                COUNT(*) FILTER (WHERE status = 'failed') as failed,
607                COUNT(*) FILTER (WHERE status = 'dead_letter') as dead_letter
608            FROM forge_jobs
609            "#,
610        )
611        .fetch_one(&self.pool)
612        .await?;
613
614        use sqlx::Row;
615        Ok(QueueStats {
616            pending: row.get::<i64, _>("pending") as u64,
617            claimed: row.get::<i64, _>("claimed") as u64,
618            running: row.get::<i64, _>("running") as u64,
619            completed: row.get::<i64, _>("completed") as u64,
620            cancelled: row.get::<i64, _>("cancelled") as u64,
621            failed: row.get::<i64, _>("failed") as u64,
622            dead_letter: row.get::<i64, _>("dead_letter") as u64,
623        })
624    }
625}
626
627/// Queue statistics.
628#[derive(Debug, Clone, Default)]
629pub struct QueueStats {
630    pub pending: u64,
631    pub claimed: u64,
632    pub running: u64,
633    pub completed: u64,
634    pub cancelled: u64,
635    pub failed: u64,
636    pub dead_letter: u64,
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642
643    #[test]
644    fn test_job_record_creation() {
645        let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
646
647        assert_eq!(job.job_type, "send_email");
648        assert_eq!(job.status, JobStatus::Pending);
649        assert_eq!(job.priority, 50);
650        assert_eq!(job.attempts, 0);
651        assert_eq!(job.max_attempts, 3);
652    }
653
654    #[test]
655    fn test_job_record_with_capability() {
656        let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
657            .with_capability("media");
658
659        assert_eq!(job.worker_capability, Some("media".to_string()));
660        assert_eq!(job.priority, 75);
661    }
662
663    #[test]
664    fn test_job_record_with_idempotency() {
665        let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
666            .with_idempotency_key("payment-123");
667
668        assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
669    }
670}