Skip to main content

forge_runtime/jobs/
queue.rs

1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5/// A job record in the database.
6#[derive(Debug, Clone)]
7pub struct JobRecord {
8    /// Unique job ID.
9    pub id: Uuid,
10    /// Job type/name.
11    pub job_type: String,
12    /// Job input as JSON.
13    pub input: serde_json::Value,
14    /// Job output as JSON (if completed).
15    pub output: Option<serde_json::Value>,
16    /// Persisted context data.
17    pub job_context: serde_json::Value,
18    /// Current status.
19    pub status: JobStatus,
20    /// Priority level.
21    pub priority: i32,
22    /// Number of attempts made.
23    pub attempts: i32,
24    /// Maximum attempts allowed.
25    pub max_attempts: i32,
26    /// Last error message.
27    pub last_error: Option<String>,
28    /// Required worker capability.
29    pub worker_capability: Option<String>,
30    /// Worker ID that claimed the job.
31    pub worker_id: Option<Uuid>,
32    /// Idempotency key for deduplication.
33    pub idempotency_key: Option<String>,
34    /// Principal that created the job (for access control).
35    pub owner_subject: Option<String>,
36    /// When the job is scheduled to run.
37    pub scheduled_at: DateTime<Utc>,
38    /// When the job was created.
39    pub created_at: DateTime<Utc>,
40    /// When the job was claimed.
41    pub claimed_at: Option<DateTime<Utc>>,
42    /// When the job started running.
43    pub started_at: Option<DateTime<Utc>>,
44    /// When the job completed.
45    pub completed_at: Option<DateTime<Utc>>,
46    /// When the job failed.
47    pub failed_at: Option<DateTime<Utc>>,
48    /// Last heartbeat time.
49    pub last_heartbeat: Option<DateTime<Utc>>,
50    /// Cancellation requested at.
51    pub cancel_requested_at: Option<DateTime<Utc>>,
52    /// Cancellation completed at.
53    pub cancelled_at: Option<DateTime<Utc>>,
54    /// Cancellation reason.
55    pub cancel_reason: Option<String>,
56}
57
58impl JobRecord {
59    /// Create a new job record.
60    pub fn new(
61        job_type: impl Into<String>,
62        input: serde_json::Value,
63        priority: JobPriority,
64        max_attempts: i32,
65    ) -> Self {
66        Self {
67            id: Uuid::new_v4(),
68            job_type: job_type.into(),
69            input,
70            output: None,
71            job_context: serde_json::json!({}),
72            status: JobStatus::Pending,
73            priority: priority.as_i32(),
74            attempts: 0,
75            max_attempts,
76            last_error: None,
77            worker_capability: None,
78            worker_id: None,
79            idempotency_key: None,
80            owner_subject: None,
81            scheduled_at: Utc::now(),
82            created_at: Utc::now(),
83            claimed_at: None,
84            started_at: None,
85            completed_at: None,
86            failed_at: None,
87            last_heartbeat: None,
88            cancel_requested_at: None,
89            cancelled_at: None,
90            cancel_reason: None,
91        }
92    }
93
94    /// Set worker capability requirement.
95    pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
96        self.worker_capability = Some(capability.into());
97        self
98    }
99
100    /// Set scheduled time.
101    pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
102        self.scheduled_at = at;
103        self
104    }
105
106    /// Set idempotency key.
107    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
108        self.idempotency_key = Some(key.into());
109        self
110    }
111
112    /// Set owner subject.
113    pub fn with_owner_subject(mut self, owner_subject: Option<String>) -> Self {
114        self.owner_subject = owner_subject;
115        self
116    }
117}
118
119/// Job queue operations.
120#[derive(Clone)]
121pub struct JobQueue {
122    pool: sqlx::PgPool,
123}
124
125impl JobQueue {
126    /// Create a new job queue.
127    pub fn new(pool: sqlx::PgPool) -> Self {
128        Self { pool }
129    }
130
131    /// Enqueue a new job.
132    pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
133        // Check for duplicate if idempotency key is set
134        if let Some(ref key) = job.idempotency_key {
135            let existing = sqlx::query_scalar!(
136                r#"
137                SELECT id FROM forge_jobs
138                WHERE idempotency_key = $1
139                  AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
140                "#,
141                key
142            )
143            .fetch_optional(&self.pool)
144            .await?;
145
146            if let Some(id) = existing {
147                return Ok(id); // Return existing job ID
148            }
149        }
150
151        sqlx::query!(
152            r#"
153            INSERT INTO forge_jobs (
154                id, job_type, input, job_context, status, priority, attempts, max_attempts,
155                worker_capability, idempotency_key, owner_subject, scheduled_at, created_at
156            ) VALUES (
157                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
158            )
159            "#,
160            job.id,
161            &job.job_type,
162            job.input as _,
163            job.job_context as _,
164            job.status.as_str(),
165            job.priority,
166            job.attempts,
167            job.max_attempts,
168            job.worker_capability as _,
169            job.idempotency_key as _,
170            job.owner_subject as _,
171            job.scheduled_at,
172            job.created_at,
173        )
174        .execute(&self.pool)
175        .await?;
176
177        Ok(job.id)
178    }
179
180    /// Claim jobs using SKIP LOCKED pattern.
181    pub async fn claim(
182        &self,
183        worker_id: Uuid,
184        capabilities: &[String],
185        limit: i32,
186    ) -> Result<Vec<JobRecord>, sqlx::Error> {
187        let rows = sqlx::query!(
188            r#"
189            WITH claimable AS (
190                SELECT id
191                FROM forge_jobs
192                WHERE status = 'pending'
193                  AND scheduled_at <= NOW()
194                  AND (worker_capability = ANY($2) OR worker_capability IS NULL)
195                ORDER BY priority DESC, scheduled_at ASC
196                LIMIT $3
197                FOR UPDATE SKIP LOCKED
198            )
199            UPDATE forge_jobs
200            SET
201                status = 'claimed',
202                worker_id = $1,
203                claimed_at = NOW(),
204                attempts = attempts + 1
205            WHERE id IN (SELECT id FROM claimable)
206            RETURNING
207                id, job_type, input, output, job_context, status, priority,
208                attempts, max_attempts, last_error, worker_capability,
209                worker_id, idempotency_key, owner_subject, scheduled_at, created_at,
210                claimed_at, started_at, completed_at, failed_at, last_heartbeat,
211                cancel_requested_at, cancelled_at, cancel_reason
212            "#,
213            worker_id,
214            capabilities,
215            limit as i64,
216        )
217        .fetch_all(&self.pool)
218        .await?;
219
220        let jobs = rows
221            .into_iter()
222            .map(|row| JobRecord {
223                id: row.id,
224                job_type: row.job_type,
225                input: row.input,
226                output: row.output,
227                job_context: row.job_context,
228                status: row
229                    .status
230                    .parse()
231                    .unwrap_or(forge_core::job::JobStatus::Failed),
232                priority: row.priority,
233                attempts: row.attempts,
234                max_attempts: row.max_attempts,
235                last_error: row.last_error,
236                worker_capability: row.worker_capability,
237                worker_id: row.worker_id,
238                idempotency_key: row.idempotency_key,
239                owner_subject: row.owner_subject,
240                scheduled_at: row.scheduled_at,
241                created_at: row.created_at,
242                claimed_at: row.claimed_at,
243                started_at: row.started_at,
244                completed_at: row.completed_at,
245                failed_at: row.failed_at,
246                last_heartbeat: row.last_heartbeat,
247                cancel_requested_at: row.cancel_requested_at,
248                cancelled_at: row.cancelled_at,
249                cancel_reason: row.cancel_reason,
250            })
251            .collect();
252
253        Ok(jobs)
254    }
255
256    /// Mark job as running.
257    pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
258        let result = sqlx::query!(
259            r#"
260            UPDATE forge_jobs
261            SET status = 'running', started_at = NOW(), last_heartbeat = NOW()
262            WHERE id = $1
263              AND status NOT IN ('cancel_requested', 'cancelled')
264            "#,
265            job_id,
266        )
267        .execute(&self.pool)
268        .await?;
269
270        if result.rows_affected() == 0 {
271            return Err(sqlx::Error::RowNotFound);
272        }
273
274        Ok(())
275    }
276
277    /// Mark job as completed.
278    ///
279    /// If `ttl` is provided, sets `expires_at` for automatic cleanup.
280    pub async fn complete(
281        &self,
282        job_id: Uuid,
283        output: serde_json::Value,
284        ttl: Option<std::time::Duration>,
285    ) -> Result<(), sqlx::Error> {
286        let expires_at = ttl.map(|d| {
287            chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
288        });
289
290        sqlx::query!(
291            r#"
292            UPDATE forge_jobs
293            SET
294                status = 'completed',
295                output = $2,
296                completed_at = NOW(),
297                cancel_requested_at = NULL,
298                cancelled_at = NULL,
299                cancel_reason = NULL,
300                expires_at = $3
301            WHERE id = $1
302            "#,
303            job_id,
304            output as _,
305            expires_at,
306        )
307        .execute(&self.pool)
308        .await?;
309
310        Ok(())
311    }
312
313    /// Mark job as failed, schedule retry or move to dead letter.
314    ///
315    /// If `ttl` is provided and job moves to dead_letter, sets `expires_at` for automatic cleanup.
316    pub async fn fail(
317        &self,
318        job_id: Uuid,
319        error: &str,
320        retry_delay: Option<chrono::Duration>,
321        ttl: Option<std::time::Duration>,
322    ) -> Result<(), sqlx::Error> {
323        if let Some(delay) = retry_delay {
324            // Schedule retry
325            sqlx::query!(
326                r#"
327                UPDATE forge_jobs
328                SET
329                    status = 'pending',
330                    worker_id = NULL,
331                    claimed_at = NULL,
332                    started_at = NULL,
333                    last_error = $2,
334                    scheduled_at = NOW() + make_interval(secs => $3),
335                    cancel_requested_at = NULL,
336                    cancelled_at = NULL,
337                    cancel_reason = NULL
338                WHERE id = $1
339                "#,
340                job_id,
341                error,
342                delay.num_seconds() as f64,
343            )
344            .execute(&self.pool)
345            .await?;
346        } else {
347            // Move to dead letter
348            let expires_at = ttl.map(|d| {
349                chrono::Utc::now()
350                    + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
351            });
352
353            sqlx::query!(
354                r#"
355                UPDATE forge_jobs
356                SET
357                    status = 'dead_letter',
358                    last_error = $2,
359                    failed_at = NOW(),
360                    cancel_requested_at = NULL,
361                    cancelled_at = NULL,
362                    cancel_reason = NULL,
363                    expires_at = $3
364                WHERE id = $1
365                "#,
366                job_id,
367                error,
368                expires_at,
369            )
370            .execute(&self.pool)
371            .await?;
372        }
373
374        Ok(())
375    }
376
377    /// Update heartbeat for a running job.
378    pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
379        sqlx::query!(
380            r#"
381            UPDATE forge_jobs
382            SET last_heartbeat = NOW()
383            WHERE id = $1
384            "#,
385            job_id,
386        )
387        .execute(&self.pool)
388        .await?;
389
390        Ok(())
391    }
392
393    /// Update job progress.
394    pub async fn update_progress(
395        &self,
396        job_id: Uuid,
397        percent: i32,
398        message: &str,
399    ) -> Result<(), sqlx::Error> {
400        sqlx::query!(
401            r#"
402            UPDATE forge_jobs
403            SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
404            WHERE id = $1
405            "#,
406            job_id,
407            percent,
408            message,
409        )
410        .execute(&self.pool)
411        .await?;
412
413        Ok(())
414    }
415
416    /// Replace persisted job context.
417    pub async fn set_context(
418        &self,
419        job_id: Uuid,
420        context: serde_json::Value,
421    ) -> Result<(), sqlx::Error> {
422        sqlx::query!(
423            r#"
424            UPDATE forge_jobs
425            SET job_context = $2
426            WHERE id = $1
427            "#,
428            job_id,
429            context as _,
430        )
431        .execute(&self.pool)
432        .await?;
433
434        Ok(())
435    }
436
437    /// Request cancellation for a job.
438    ///
439    /// If `caller_subject` is provided, the cancellation will only succeed if
440    /// the job has no `owner_subject` or the `owner_subject` matches the caller.
441    /// This prevents unauthorized users from cancelling other users' jobs.
442    pub async fn request_cancel(
443        &self,
444        job_id: Uuid,
445        reason: Option<&str>,
446        caller_subject: Option<&str>,
447    ) -> Result<bool, sqlx::Error> {
448        let row = sqlx::query!(
449            "SELECT status, owner_subject FROM forge_jobs WHERE id = $1",
450            job_id
451        )
452        .fetch_optional(&self.pool)
453        .await?;
454
455        let (status, owner_subject) = match row {
456            Some(r) => (r.status, r.owner_subject),
457            None => return Ok(false),
458        };
459
460        // Verify ownership: if job has an owner, caller must match.
461        // Reject if no caller_subject is provided for an owned job.
462        if let Some(ref owner) = owner_subject {
463            match caller_subject {
464                Some(caller) if caller == owner => { /* authorized */ }
465                _ => return Ok(false), // no caller or mismatch -> deny
466            }
467        }
468
469        let terminal_statuses = [
470            JobStatus::Completed.as_str(),
471            JobStatus::Failed.as_str(),
472            JobStatus::DeadLetter.as_str(),
473            JobStatus::Cancelled.as_str(),
474        ];
475
476        if status == JobStatus::Running.as_str() {
477            let updated = sqlx::query!(
478                r#"
479                UPDATE forge_jobs
480                SET
481                    status = 'cancel_requested',
482                    cancel_requested_at = NOW(),
483                    cancel_reason = COALESCE($2, cancel_reason)
484                WHERE id = $1
485                  AND status = 'running'
486                "#,
487                job_id,
488                reason,
489            )
490            .execute(&self.pool)
491            .await?;
492
493            return Ok(updated.rows_affected() > 0);
494        }
495
496        if terminal_statuses.contains(&status.as_str()) {
497            return Ok(false);
498        }
499
500        let updated = sqlx::query!(
501            r#"
502            UPDATE forge_jobs
503            SET
504                status = 'cancelled',
505                cancelled_at = NOW(),
506                cancel_reason = COALESCE($2, cancel_reason)
507            WHERE id = $1
508              AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
509            "#,
510            job_id,
511            reason,
512        )
513        .execute(&self.pool)
514        .await?;
515
516        Ok(updated.rows_affected() > 0)
517    }
518
519    /// Mark job as cancelled.
520    ///
521    /// If `ttl` is provided, sets `expires_at` for automatic cleanup.
522    pub async fn cancel(
523        &self,
524        job_id: Uuid,
525        reason: Option<&str>,
526        ttl: Option<std::time::Duration>,
527    ) -> Result<(), sqlx::Error> {
528        let expires_at = ttl.map(|d| {
529            chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
530        });
531
532        sqlx::query!(
533            r#"
534            UPDATE forge_jobs
535            SET
536                status = 'cancelled',
537                cancelled_at = NOW(),
538                cancel_reason = COALESCE($2, cancel_reason),
539                expires_at = $3
540            WHERE id = $1
541            "#,
542            job_id,
543            reason,
544            expires_at,
545        )
546        .execute(&self.pool)
547        .await?;
548
549        Ok(())
550    }
551
552    /// Release stale jobs back to pending.
553    pub async fn release_stale(
554        &self,
555        stale_threshold: chrono::Duration,
556    ) -> Result<u64, sqlx::Error> {
557        let result = sqlx::query!(
558            r#"
559            UPDATE forge_jobs
560            SET
561                status = 'pending',
562                worker_id = NULL,
563                claimed_at = NULL,
564                started_at = NULL,
565                last_heartbeat = NULL
566            WHERE
567                (
568                    status = 'claimed'
569                    AND claimed_at < NOW() - make_interval(secs => $1)
570                )
571                OR (
572                    status = 'running'
573                    AND COALESCE(last_heartbeat, started_at, claimed_at) < NOW() - make_interval(secs => $1)
574                )
575            "#,
576            stale_threshold.num_seconds() as f64,
577        )
578        .execute(&self.pool)
579        .await?;
580
581        Ok(result.rows_affected())
582    }
583
584    /// Delete expired job records.
585    ///
586    /// Only deletes terminal jobs (completed, cancelled, failed, dead_letter)
587    /// that have passed their TTL.
588    pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
589        let result = sqlx::query!(
590            r#"
591            DELETE FROM forge_jobs
592            WHERE expires_at IS NOT NULL
593              AND expires_at < NOW()
594              AND status IN ('completed', 'cancelled', 'failed', 'dead_letter')
595            "#,
596        )
597        .execute(&self.pool)
598        .await?;
599
600        Ok(result.rows_affected())
601    }
602
603    /// Get queue statistics.
604    pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
605        let row = sqlx::query!(
606            r#"
607            SELECT
608                COUNT(*) FILTER (WHERE status = 'pending') as "pending!",
609                COUNT(*) FILTER (WHERE status = 'claimed') as "claimed!",
610                COUNT(*) FILTER (WHERE status = 'running') as "running!",
611                COUNT(*) FILTER (WHERE status = 'completed') as "completed!",
612                COUNT(*) FILTER (WHERE status = 'cancelled') as "cancelled!",
613                COUNT(*) FILTER (WHERE status = 'failed') as "failed!",
614                COUNT(*) FILTER (WHERE status = 'dead_letter') as "dead_letter!"
615            FROM forge_jobs
616            "#,
617        )
618        .fetch_one(&self.pool)
619        .await?;
620
621        Ok(QueueStats {
622            pending: row.pending as u64,
623            claimed: row.claimed as u64,
624            running: row.running as u64,
625            completed: row.completed as u64,
626            cancelled: row.cancelled as u64,
627            failed: row.failed as u64,
628            dead_letter: row.dead_letter as u64,
629        })
630    }
631}
632
633/// Queue statistics.
634#[derive(Debug, Clone, Default)]
635pub struct QueueStats {
636    pub pending: u64,
637    pub claimed: u64,
638    pub running: u64,
639    pub completed: u64,
640    pub cancelled: u64,
641    pub failed: u64,
642    pub dead_letter: u64,
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648
649    #[test]
650    fn test_job_record_creation() {
651        let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
652
653        assert_eq!(job.job_type, "send_email");
654        assert_eq!(job.status, JobStatus::Pending);
655        assert_eq!(job.priority, 50);
656        assert_eq!(job.attempts, 0);
657        assert_eq!(job.max_attempts, 3);
658    }
659
660    #[test]
661    fn test_job_record_with_capability() {
662        let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
663            .with_capability("media");
664
665        assert_eq!(job.worker_capability, Some("media".to_string()));
666        assert_eq!(job.priority, 75);
667    }
668
669    #[test]
670    fn test_job_record_with_idempotency() {
671        let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
672            .with_idempotency_key("payment-123");
673
674        assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
675    }
676}