Skip to main content

forge_runtime/jobs/
queue.rs

1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5/// A job record in the database.
6#[derive(Debug, Clone)]
7pub struct JobRecord {
8    /// Unique job ID.
9    pub id: Uuid,
10    /// Job type/name.
11    pub job_type: String,
12    /// Job input as JSON.
13    pub input: serde_json::Value,
14    /// Job output as JSON (if completed).
15    pub output: Option<serde_json::Value>,
16    /// Persisted context data.
17    pub job_context: serde_json::Value,
18    /// Current status.
19    pub status: JobStatus,
20    /// Priority level.
21    pub priority: i32,
22    /// Number of attempts made.
23    pub attempts: i32,
24    /// Maximum attempts allowed.
25    pub max_attempts: i32,
26    /// Last error message.
27    pub last_error: Option<String>,
28    /// Required worker capability.
29    pub worker_capability: Option<String>,
30    /// Worker ID that claimed the job.
31    pub worker_id: Option<Uuid>,
32    /// Idempotency key for deduplication.
33    pub idempotency_key: Option<String>,
34    /// Principal that created the job (for access control).
35    pub owner_subject: Option<String>,
36    /// When the job is scheduled to run.
37    pub scheduled_at: DateTime<Utc>,
38    /// When the job was created.
39    pub created_at: DateTime<Utc>,
40    /// When the job was claimed.
41    pub claimed_at: Option<DateTime<Utc>>,
42    /// When the job started running.
43    pub started_at: Option<DateTime<Utc>>,
44    /// When the job completed.
45    pub completed_at: Option<DateTime<Utc>>,
46    /// When the job failed.
47    pub failed_at: Option<DateTime<Utc>>,
48    /// Last heartbeat time.
49    pub last_heartbeat: Option<DateTime<Utc>>,
50    /// Cancellation requested at.
51    pub cancel_requested_at: Option<DateTime<Utc>>,
52    /// Cancellation completed at.
53    pub cancelled_at: Option<DateTime<Utc>>,
54    /// Cancellation reason.
55    pub cancel_reason: Option<String>,
56}
57
58impl JobRecord {
59    /// Create a new job record.
60    pub fn new(
61        job_type: impl Into<String>,
62        input: serde_json::Value,
63        priority: JobPriority,
64        max_attempts: i32,
65    ) -> Self {
66        Self {
67            id: Uuid::new_v4(),
68            job_type: job_type.into(),
69            input,
70            output: None,
71            job_context: serde_json::json!({}),
72            status: JobStatus::Pending,
73            priority: priority.as_i32(),
74            attempts: 0,
75            max_attempts,
76            last_error: None,
77            worker_capability: None,
78            worker_id: None,
79            idempotency_key: None,
80            owner_subject: None,
81            scheduled_at: Utc::now(),
82            created_at: Utc::now(),
83            claimed_at: None,
84            started_at: None,
85            completed_at: None,
86            failed_at: None,
87            last_heartbeat: None,
88            cancel_requested_at: None,
89            cancelled_at: None,
90            cancel_reason: None,
91        }
92    }
93
94    /// Set worker capability requirement.
95    pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
96        self.worker_capability = Some(capability.into());
97        self
98    }
99
100    /// Set scheduled time.
101    pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
102        self.scheduled_at = at;
103        self
104    }
105
106    /// Set idempotency key.
107    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
108        self.idempotency_key = Some(key.into());
109        self
110    }
111
112    /// Set owner subject.
113    pub fn with_owner_subject(mut self, owner_subject: Option<String>) -> Self {
114        self.owner_subject = owner_subject;
115        self
116    }
117}
118
119/// Job queue operations.
120#[derive(Clone)]
121pub struct JobQueue {
122    pool: sqlx::PgPool,
123}
124
125impl JobQueue {
126    /// Create a new job queue.
127    pub fn new(pool: sqlx::PgPool) -> Self {
128        Self { pool }
129    }
130
131    /// Enqueue a new job.
132    pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
133        // Check for duplicate if idempotency key is set
134        if let Some(ref key) = job.idempotency_key {
135            let existing = sqlx::query_scalar!(
136                r#"
137                SELECT id FROM forge_jobs
138                WHERE idempotency_key = $1
139                  AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
140                "#,
141                key
142            )
143            .fetch_optional(&self.pool)
144            .await?;
145
146            if let Some(id) = existing {
147                return Ok(id); // Return existing job ID
148            }
149        }
150
151        sqlx::query!(
152            r#"
153            INSERT INTO forge_jobs (
154                id, job_type, input, job_context, status, priority, attempts, max_attempts,
155                worker_capability, idempotency_key, owner_subject, scheduled_at, created_at
156            ) VALUES (
157                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
158            )
159            "#,
160            job.id,
161            &job.job_type,
162            job.input as _,
163            job.job_context as _,
164            job.status.as_str(),
165            job.priority,
166            job.attempts,
167            job.max_attempts,
168            job.worker_capability as _,
169            job.idempotency_key as _,
170            job.owner_subject as _,
171            job.scheduled_at,
172            job.created_at,
173        )
174        .execute(&self.pool)
175        .await?;
176
177        Ok(job.id)
178    }
179
180    /// Claim jobs using SKIP LOCKED pattern.
181    pub async fn claim(
182        &self,
183        worker_id: Uuid,
184        capabilities: &[String],
185        limit: i32,
186    ) -> Result<Vec<JobRecord>, sqlx::Error> {
187        let rows = sqlx::query!(
188            r#"
189            WITH claimable AS (
190                SELECT id
191                FROM forge_jobs
192                WHERE status = 'pending'
193                  AND scheduled_at <= NOW()
194                  AND (worker_capability = ANY($2) OR worker_capability IS NULL)
195                ORDER BY priority DESC, scheduled_at ASC
196                LIMIT $3
197                FOR UPDATE SKIP LOCKED
198            )
199            UPDATE forge_jobs
200            SET
201                status = 'claimed',
202                worker_id = $1,
203                claimed_at = NOW(),
204                attempts = attempts + 1
205            WHERE id IN (SELECT id FROM claimable)
206            RETURNING
207                id, job_type, input, output, job_context, status, priority,
208                attempts, max_attempts, last_error, worker_capability,
209                worker_id, idempotency_key, owner_subject, scheduled_at, created_at,
210                claimed_at, started_at, completed_at, failed_at, last_heartbeat,
211                cancel_requested_at, cancelled_at, cancel_reason
212            "#,
213            worker_id,
214            capabilities,
215            limit as i64,
216        )
217        .fetch_all(&self.pool)
218        .await?;
219
220        let jobs = rows
221            .into_iter()
222            .map(|row| JobRecord {
223                id: row.id,
224                job_type: row.job_type,
225                input: row.input,
226                output: row.output,
227                job_context: row.job_context,
228                status: row
229                    .status
230                    .parse()
231                    .unwrap_or(forge_core::job::JobStatus::Failed),
232                priority: row.priority,
233                attempts: row.attempts,
234                max_attempts: row.max_attempts,
235                last_error: row.last_error,
236                worker_capability: row.worker_capability,
237                worker_id: row.worker_id,
238                idempotency_key: row.idempotency_key,
239                owner_subject: row.owner_subject,
240                scheduled_at: row.scheduled_at,
241                created_at: row.created_at,
242                claimed_at: row.claimed_at,
243                started_at: row.started_at,
244                completed_at: row.completed_at,
245                failed_at: row.failed_at,
246                last_heartbeat: row.last_heartbeat,
247                cancel_requested_at: row.cancel_requested_at,
248                cancelled_at: row.cancelled_at,
249                cancel_reason: row.cancel_reason,
250            })
251            .collect();
252
253        Ok(jobs)
254    }
255
256    /// Mark job as running.
257    pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
258        let result = sqlx::query!(
259            r#"
260            UPDATE forge_jobs
261            SET status = 'running', started_at = NOW(), last_heartbeat = NOW()
262            WHERE id = $1
263              AND status NOT IN ('cancel_requested', 'cancelled')
264            "#,
265            job_id,
266        )
267        .execute(&self.pool)
268        .await?;
269
270        if result.rows_affected() == 0 {
271            return Err(sqlx::Error::RowNotFound);
272        }
273
274        Ok(())
275    }
276
277    /// Mark job as completed.
278    ///
279    /// If `ttl` is provided, sets `expires_at` for automatic cleanup.
280    pub async fn complete(
281        &self,
282        job_id: Uuid,
283        output: serde_json::Value,
284        ttl: Option<std::time::Duration>,
285    ) -> Result<(), sqlx::Error> {
286        let expires_at = ttl.map(|d| {
287            chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
288        });
289
290        sqlx::query!(
291            r#"
292            UPDATE forge_jobs
293            SET
294                status = 'completed',
295                output = $2,
296                completed_at = NOW(),
297                cancel_requested_at = NULL,
298                cancelled_at = NULL,
299                cancel_reason = NULL,
300                expires_at = $3
301            WHERE id = $1
302            "#,
303            job_id,
304            output as _,
305            expires_at,
306        )
307        .execute(&self.pool)
308        .await?;
309
310        Ok(())
311    }
312
313    /// Mark job as failed, schedule retry or move to dead letter.
314    ///
315    /// If `ttl` is provided and job moves to dead_letter, sets `expires_at` for automatic cleanup.
316    pub async fn fail(
317        &self,
318        job_id: Uuid,
319        error: &str,
320        retry_delay: Option<chrono::Duration>,
321        ttl: Option<std::time::Duration>,
322    ) -> Result<(), sqlx::Error> {
323        if let Some(delay) = retry_delay {
324            // Schedule retry
325            sqlx::query!(
326                r#"
327                UPDATE forge_jobs
328                SET
329                    status = 'pending',
330                    worker_id = NULL,
331                    claimed_at = NULL,
332                    started_at = NULL,
333                    last_error = $2,
334                    scheduled_at = NOW() + make_interval(secs => $3),
335                    cancel_requested_at = NULL,
336                    cancelled_at = NULL,
337                    cancel_reason = NULL
338                WHERE id = $1
339                "#,
340                job_id,
341                error,
342                delay.num_seconds() as f64,
343            )
344            .execute(&self.pool)
345            .await?;
346        } else {
347            // Move to dead letter
348            let expires_at = ttl.map(|d| {
349                chrono::Utc::now()
350                    + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
351            });
352
353            sqlx::query!(
354                r#"
355                UPDATE forge_jobs
356                SET
357                    status = 'dead_letter',
358                    last_error = $2,
359                    failed_at = NOW(),
360                    cancel_requested_at = NULL,
361                    cancelled_at = NULL,
362                    cancel_reason = NULL,
363                    expires_at = $3
364                WHERE id = $1
365                "#,
366                job_id,
367                error,
368                expires_at,
369            )
370            .execute(&self.pool)
371            .await?;
372        }
373
374        Ok(())
375    }
376
377    /// Update heartbeat for a running job.
378    pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
379        sqlx::query!(
380            r#"
381            UPDATE forge_jobs
382            SET last_heartbeat = NOW()
383            WHERE id = $1
384            "#,
385            job_id,
386        )
387        .execute(&self.pool)
388        .await?;
389
390        Ok(())
391    }
392
393    /// Update job progress.
394    pub async fn update_progress(
395        &self,
396        job_id: Uuid,
397        percent: i32,
398        message: &str,
399    ) -> Result<(), sqlx::Error> {
400        sqlx::query!(
401            r#"
402            UPDATE forge_jobs
403            SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
404            WHERE id = $1
405            "#,
406            job_id,
407            percent,
408            message,
409        )
410        .execute(&self.pool)
411        .await?;
412
413        Ok(())
414    }
415
416    /// Replace persisted job context.
417    pub async fn set_context(
418        &self,
419        job_id: Uuid,
420        context: serde_json::Value,
421    ) -> Result<(), sqlx::Error> {
422        sqlx::query!(
423            r#"
424            UPDATE forge_jobs
425            SET job_context = $2
426            WHERE id = $1
427            "#,
428            job_id,
429            context as _,
430        )
431        .execute(&self.pool)
432        .await?;
433
434        Ok(())
435    }
436
437    /// Request cancellation for a job.
438    ///
439    /// If `caller_subject` is provided, the cancellation will only succeed if
440    /// the job has no `owner_subject` or the `owner_subject` matches the caller.
441    /// This prevents unauthorized users from cancelling other users' jobs.
442    pub async fn request_cancel(
443        &self,
444        job_id: Uuid,
445        reason: Option<&str>,
446        caller_subject: Option<&str>,
447    ) -> Result<bool, sqlx::Error> {
448        let row = sqlx::query!(
449            "SELECT status, owner_subject FROM forge_jobs WHERE id = $1",
450            job_id
451        )
452        .fetch_optional(&self.pool)
453        .await?;
454
455        let (status, owner_subject) = match row {
456            Some(r) => (r.status, r.owner_subject),
457            None => return Ok(false),
458        };
459
460        // Verify ownership: if job has an owner, caller must match.
461        // Reject if no caller_subject is provided for an owned job.
462        if let Some(ref owner) = owner_subject {
463            match caller_subject {
464                Some(caller) if caller == owner => { /* authorized */ }
465                _ => return Ok(false), // no caller or mismatch -> deny
466            }
467        }
468
469        let terminal_statuses = [
470            JobStatus::Completed.as_str(),
471            JobStatus::Failed.as_str(),
472            JobStatus::DeadLetter.as_str(),
473            JobStatus::Cancelled.as_str(),
474        ];
475
476        if status == JobStatus::Running.as_str() {
477            let updated = sqlx::query!(
478                r#"
479                UPDATE forge_jobs
480                SET
481                    status = 'cancel_requested',
482                    cancel_requested_at = NOW(),
483                    cancel_reason = COALESCE($2, cancel_reason)
484                WHERE id = $1
485                  AND status = 'running'
486                "#,
487                job_id,
488                reason,
489            )
490            .execute(&self.pool)
491            .await?;
492
493            return Ok(updated.rows_affected() > 0);
494        }
495
496        if terminal_statuses.contains(&status.as_str()) {
497            return Ok(false);
498        }
499
500        let updated = sqlx::query!(
501            r#"
502            UPDATE forge_jobs
503            SET
504                status = 'cancelled',
505                cancelled_at = NOW(),
506                cancel_reason = COALESCE($2, cancel_reason)
507            WHERE id = $1
508              AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
509            "#,
510            job_id,
511            reason,
512        )
513        .execute(&self.pool)
514        .await?;
515
516        Ok(updated.rows_affected() > 0)
517    }
518
519    /// Mark job as cancelled.
520    ///
521    /// If `ttl` is provided, sets `expires_at` for automatic cleanup.
522    pub async fn cancel(
523        &self,
524        job_id: Uuid,
525        reason: Option<&str>,
526        ttl: Option<std::time::Duration>,
527    ) -> Result<(), sqlx::Error> {
528        let expires_at = ttl.map(|d| {
529            chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
530        });
531
532        sqlx::query!(
533            r#"
534            UPDATE forge_jobs
535            SET
536                status = 'cancelled',
537                cancelled_at = NOW(),
538                cancel_reason = COALESCE($2, cancel_reason),
539                expires_at = $3
540            WHERE id = $1
541            "#,
542            job_id,
543            reason,
544            expires_at,
545        )
546        .execute(&self.pool)
547        .await?;
548
549        Ok(())
550    }
551
552    /// Release stale jobs back to pending.
553    pub async fn release_stale(
554        &self,
555        stale_threshold: chrono::Duration,
556    ) -> Result<u64, sqlx::Error> {
557        let result = sqlx::query!(
558            r#"
559            UPDATE forge_jobs
560            SET
561                status = 'pending',
562                worker_id = NULL,
563                claimed_at = NULL,
564                started_at = NULL,
565                last_heartbeat = NULL
566            WHERE
567                (
568                    status = 'claimed'
569                    AND claimed_at < NOW() - make_interval(secs => $1)
570                )
571                OR (
572                    status = 'running'
573                    AND COALESCE(last_heartbeat, started_at, claimed_at) < NOW() - make_interval(secs => $1)
574                )
575            "#,
576            stale_threshold.num_seconds() as f64,
577        )
578        .execute(&self.pool)
579        .await?;
580
581        Ok(result.rows_affected())
582    }
583
584    /// Delete expired job records.
585    ///
586    /// Only deletes terminal jobs (completed, cancelled, failed, dead_letter)
587    /// that have passed their TTL.
588    pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
589        let result = sqlx::query!(
590            r#"
591            DELETE FROM forge_jobs
592            WHERE expires_at IS NOT NULL
593              AND expires_at < NOW()
594              AND status IN ('completed', 'cancelled', 'failed', 'dead_letter')
595            "#,
596        )
597        .execute(&self.pool)
598        .await?;
599
600        Ok(result.rows_affected())
601    }
602
603    /// Get queue statistics.
604    pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
605        let row = sqlx::query!(
606            r#"
607            SELECT
608                COUNT(*) FILTER (WHERE status = 'pending') as "pending!",
609                COUNT(*) FILTER (WHERE status = 'claimed') as "claimed!",
610                COUNT(*) FILTER (WHERE status = 'running') as "running!",
611                COUNT(*) FILTER (WHERE status = 'completed') as "completed!",
612                COUNT(*) FILTER (WHERE status = 'cancelled') as "cancelled!",
613                COUNT(*) FILTER (WHERE status = 'failed') as "failed!",
614                COUNT(*) FILTER (WHERE status = 'dead_letter') as "dead_letter!"
615            FROM forge_jobs
616            "#,
617        )
618        .fetch_one(&self.pool)
619        .await?;
620
621        Ok(QueueStats {
622            pending: row.pending as u64,
623            claimed: row.claimed as u64,
624            running: row.running as u64,
625            completed: row.completed as u64,
626            cancelled: row.cancelled as u64,
627            failed: row.failed as u64,
628            dead_letter: row.dead_letter as u64,
629        })
630    }
631}
632
633/// Queue statistics.
634#[derive(Debug, Clone, Default)]
635pub struct QueueStats {
636    pub pending: u64,
637    pub claimed: u64,
638    pub running: u64,
639    pub completed: u64,
640    pub cancelled: u64,
641    pub failed: u64,
642    pub dead_letter: u64,
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648
649    #[test]
650    fn test_job_record_creation() {
651        let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
652
653        assert_eq!(job.job_type, "send_email");
654        assert_eq!(job.status, JobStatus::Pending);
655        assert_eq!(job.priority, 50);
656        assert_eq!(job.attempts, 0);
657        assert_eq!(job.max_attempts, 3);
658    }
659
660    #[test]
661    fn test_job_record_with_capability() {
662        let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
663            .with_capability("media");
664
665        assert_eq!(job.worker_capability, Some("media".to_string()));
666        assert_eq!(job.priority, 75);
667    }
668
669    #[test]
670    fn test_job_record_with_idempotency() {
671        let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
672            .with_idempotency_key("payment-123");
673
674        assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
675    }
676
677    #[test]
678    fn test_job_record_with_owner_subject() {
679        let job = JobRecord::new("task", serde_json::json!({}), JobPriority::Normal, 3)
680            .with_owner_subject(Some("user-123".into()));
681        assert_eq!(job.owner_subject, Some("user-123".to_string()));
682    }
683
684    #[test]
685    fn test_priority_ordering() {
686        let bg = JobRecord::new("a", serde_json::json!({}), JobPriority::Background, 1);
687        let low = JobRecord::new("b", serde_json::json!({}), JobPriority::Low, 1);
688        let normal = JobRecord::new("c", serde_json::json!({}), JobPriority::Normal, 1);
689        let high = JobRecord::new("d", serde_json::json!({}), JobPriority::High, 1);
690        let critical = JobRecord::new("e", serde_json::json!({}), JobPriority::Critical, 1);
691
692        assert!(bg.priority < low.priority);
693        assert!(low.priority < normal.priority);
694        assert!(normal.priority < high.priority);
695        assert!(high.priority < critical.priority);
696    }
697}
698
699/// Integration tests requiring a real PostgreSQL database.
700/// Run with: cargo test -p forge-runtime --features testcontainers
701#[cfg(all(test, feature = "testcontainers"))]
702#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
703mod integration_tests {
704    use super::*;
705    use forge_core::testing::{IsolatedTestDb, TestDatabase};
706
707    async fn setup_db(test_name: &str) -> IsolatedTestDb {
708        let base = TestDatabase::from_env()
709            .await
710            .expect("Failed to create test database");
711        let db = base
712            .isolated(test_name)
713            .await
714            .expect("Failed to create isolated db");
715        let system_sql = crate::migrations::get_all_system_sql();
716        db.run_sql(&system_sql)
717            .await
718            .expect("Failed to apply system schema");
719        db
720    }
721
722    #[tokio::test]
723    async fn enqueue_and_claim_job() {
724        let db = setup_db("enqueue_and_claim").await;
725        let queue = JobQueue::new(db.pool().clone());
726        let worker_id = Uuid::new_v4();
727
728        // Enqueue a job
729        let job = JobRecord::new(
730            "send_email",
731            serde_json::json!({"to": "a@b.com"}),
732            JobPriority::Normal,
733            3,
734        );
735        let job_id = queue.enqueue(job).await.expect("Failed to enqueue");
736
737        // Claim it
738        let claimed = queue
739            .claim(worker_id, &[], 10)
740            .await
741            .expect("Failed to claim");
742        assert_eq!(claimed.len(), 1);
743        assert_eq!(claimed[0].id, job_id);
744        assert_eq!(claimed[0].job_type, "send_email");
745        assert_eq!(claimed[0].status, JobStatus::Claimed);
746        assert_eq!(claimed[0].attempts, 1);
747        assert!(claimed[0].worker_id.is_some());
748
749        db.cleanup().await.expect("cleanup");
750    }
751
752    #[tokio::test]
753    async fn claim_respects_skip_locked() {
754        let db = setup_db("claim_skip_locked").await;
755        let queue = JobQueue::new(db.pool().clone());
756
757        // Enqueue 3 jobs
758        for i in 0..3 {
759            let job = JobRecord::new(
760                format!("job_{i}"),
761                serde_json::json!({}),
762                JobPriority::Normal,
763                3,
764            );
765            queue.enqueue(job).await.expect("enqueue");
766        }
767
768        // Worker 1 claims 2
769        let worker1 = Uuid::new_v4();
770        let batch1 = queue.claim(worker1, &[], 2).await.expect("claim1");
771        assert_eq!(batch1.len(), 2);
772
773        // Worker 2 claims remaining
774        let worker2 = Uuid::new_v4();
775        let batch2 = queue.claim(worker2, &[], 2).await.expect("claim2");
776        assert_eq!(batch2.len(), 1);
777
778        // No overlap
779        let ids1: Vec<Uuid> = batch1.iter().map(|j| j.id).collect();
780        let ids2: Vec<Uuid> = batch2.iter().map(|j| j.id).collect();
781        for id in &ids2 {
782            assert!(
783                !ids1.contains(id),
784                "SKIP LOCKED should prevent duplicate claims"
785            );
786        }
787
788        db.cleanup().await.expect("cleanup");
789    }
790
791    #[tokio::test]
792    async fn claim_respects_priority_ordering() {
793        let db = setup_db("claim_priority").await;
794        let queue = JobQueue::new(db.pool().clone());
795        let worker_id = Uuid::new_v4();
796
797        // Enqueue low then high priority
798        let low = JobRecord::new("low_job", serde_json::json!({}), JobPriority::Low, 3);
799        queue.enqueue(low).await.expect("enqueue low");
800
801        let high = JobRecord::new("high_job", serde_json::json!({}), JobPriority::Critical, 3);
802        queue.enqueue(high).await.expect("enqueue high");
803
804        // Claim 1 - should get the high-priority job first
805        let claimed = queue.claim(worker_id, &[], 1).await.expect("claim");
806        assert_eq!(claimed.len(), 1);
807        assert_eq!(claimed[0].job_type, "high_job");
808
809        db.cleanup().await.expect("cleanup");
810    }
811
812    #[tokio::test]
813    async fn complete_job_lifecycle() {
814        let db = setup_db("complete_lifecycle").await;
815        let queue = JobQueue::new(db.pool().clone());
816        let worker_id = Uuid::new_v4();
817
818        let job = JobRecord::new("process", serde_json::json!({}), JobPriority::Normal, 3);
819        let job_id = queue.enqueue(job).await.expect("enqueue");
820
821        // Claim
822        queue.claim(worker_id, &[], 1).await.expect("claim");
823
824        // Start
825        queue.start(job_id).await.expect("start");
826
827        // Complete
828        queue
829            .complete(job_id, serde_json::json!({"result": "done"}), None)
830            .await
831            .expect("complete");
832
833        // Verify via stats
834        let stats = queue.stats().await.expect("stats");
835        assert_eq!(stats.completed, 1);
836        assert_eq!(stats.pending, 0);
837
838        db.cleanup().await.expect("cleanup");
839    }
840
841    #[tokio::test]
842    async fn fail_with_retry_requeues_as_pending() {
843        let db = setup_db("fail_retry").await;
844        let queue = JobQueue::new(db.pool().clone());
845        let worker_id = Uuid::new_v4();
846
847        let job = JobRecord::new("flaky", serde_json::json!({}), JobPriority::Normal, 3);
848        let job_id = queue.enqueue(job).await.expect("enqueue");
849
850        queue.claim(worker_id, &[], 1).await.expect("claim");
851        queue.start(job_id).await.expect("start");
852
853        // Fail with retry delay
854        queue
855            .fail(
856                job_id,
857                "transient error",
858                Some(chrono::Duration::seconds(0)),
859                None,
860            )
861            .await
862            .expect("fail");
863
864        // Should be pending again (retryable)
865        let stats = queue.stats().await.expect("stats");
866        assert_eq!(stats.pending, 1);
867        assert_eq!(stats.dead_letter, 0);
868
869        db.cleanup().await.expect("cleanup");
870    }
871
872    #[tokio::test]
873    async fn fail_without_retry_goes_to_dead_letter() {
874        let db = setup_db("fail_dead_letter").await;
875        let queue = JobQueue::new(db.pool().clone());
876        let worker_id = Uuid::new_v4();
877
878        let job = JobRecord::new("fatal", serde_json::json!({}), JobPriority::Normal, 1);
879        let job_id = queue.enqueue(job).await.expect("enqueue");
880
881        queue.claim(worker_id, &[], 1).await.expect("claim");
882        queue.start(job_id).await.expect("start");
883
884        // Fail without retry (None delay) -> dead letter
885        queue
886            .fail(job_id, "permanent error", None, None)
887            .await
888            .expect("fail");
889
890        let stats = queue.stats().await.expect("stats");
891        assert_eq!(stats.dead_letter, 1);
892        assert_eq!(stats.pending, 0);
893
894        db.cleanup().await.expect("cleanup");
895    }
896
897    #[tokio::test]
898    async fn idempotency_key_deduplicates() {
899        let db = setup_db("idempotency").await;
900        let queue = JobQueue::new(db.pool().clone());
901
902        let job1 = JobRecord::new("pay", serde_json::json!({}), JobPriority::Normal, 3)
903            .with_idempotency_key("pay-123");
904        let id1 = queue.enqueue(job1).await.expect("enqueue1");
905
906        // Same key -> returns existing job ID
907        let job2 = JobRecord::new(
908            "pay",
909            serde_json::json!({"amount": 200}),
910            JobPriority::Normal,
911            3,
912        )
913        .with_idempotency_key("pay-123");
914        let id2 = queue.enqueue(job2).await.expect("enqueue2");
915
916        assert_eq!(id1, id2, "Idempotency key should return same job ID");
917
918        // Only one job should exist
919        let stats = queue.stats().await.expect("stats");
920        assert_eq!(stats.pending, 1);
921
922        db.cleanup().await.expect("cleanup");
923    }
924
925    #[tokio::test]
926    async fn cancel_pending_job() {
927        let db = setup_db("cancel_pending").await;
928        let queue = JobQueue::new(db.pool().clone());
929
930        let job = JobRecord::new("task", serde_json::json!({}), JobPriority::Normal, 3);
931        let job_id = queue.enqueue(job).await.expect("enqueue");
932
933        let cancelled = queue
934            .request_cancel(job_id, Some("no longer needed"), None)
935            .await
936            .expect("cancel");
937        assert!(cancelled);
938
939        let stats = queue.stats().await.expect("stats");
940        assert_eq!(stats.cancelled, 1);
941        assert_eq!(stats.pending, 0);
942
943        db.cleanup().await.expect("cleanup");
944    }
945
946    #[tokio::test]
947    async fn cancel_respects_ownership() {
948        let db = setup_db("cancel_ownership").await;
949        let queue = JobQueue::new(db.pool().clone());
950
951        let job = JobRecord::new("task", serde_json::json!({}), JobPriority::Normal, 3)
952            .with_owner_subject(Some("user-alice".into()));
953        let job_id = queue.enqueue(job).await.expect("enqueue");
954
955        // Wrong owner can't cancel
956        let denied = queue
957            .request_cancel(job_id, Some("reason"), Some("user-bob"))
958            .await
959            .expect("cancel attempt");
960        assert!(!denied, "Should deny cancellation from non-owner");
961
962        // Right owner can cancel
963        let allowed = queue
964            .request_cancel(job_id, Some("reason"), Some("user-alice"))
965            .await
966            .expect("cancel");
967        assert!(allowed, "Should allow owner to cancel");
968
969        db.cleanup().await.expect("cleanup");
970    }
971
972    #[tokio::test]
973    async fn claim_respects_worker_capability() {
974        let db = setup_db("claim_capability").await;
975        let queue = JobQueue::new(db.pool().clone());
976
977        // Job requires "gpu" capability
978        let job = JobRecord::new("render", serde_json::json!({}), JobPriority::Normal, 3)
979            .with_capability("gpu");
980        queue.enqueue(job).await.expect("enqueue");
981
982        // Worker without capability can't claim
983        let worker_no_cap = Uuid::new_v4();
984        let claimed = queue
985            .claim(worker_no_cap, &["cpu".into()], 10)
986            .await
987            .expect("claim");
988        assert!(
989            claimed.is_empty(),
990            "Worker without gpu cap should not claim gpu job"
991        );
992
993        // Worker with capability can claim
994        let worker_with_cap = Uuid::new_v4();
995        let claimed = queue
996            .claim(worker_with_cap, &["gpu".into()], 10)
997            .await
998            .expect("claim");
999        assert_eq!(claimed.len(), 1);
1000
1001        db.cleanup().await.expect("cleanup");
1002    }
1003
1004    #[tokio::test]
1005    async fn heartbeat_updates_timestamp() {
1006        let db = setup_db("heartbeat").await;
1007        let queue = JobQueue::new(db.pool().clone());
1008        let worker_id = Uuid::new_v4();
1009
1010        let job = JobRecord::new("long_task", serde_json::json!({}), JobPriority::Normal, 3);
1011        let job_id = queue.enqueue(job).await.expect("enqueue");
1012        queue.claim(worker_id, &[], 1).await.expect("claim");
1013        queue.start(job_id).await.expect("start");
1014
1015        // Heartbeat should not error
1016        queue.heartbeat(job_id).await.expect("heartbeat");
1017
1018        db.cleanup().await.expect("cleanup");
1019    }
1020
1021    #[tokio::test]
1022    async fn progress_updates_persist() {
1023        let db = setup_db("progress").await;
1024        let queue = JobQueue::new(db.pool().clone());
1025        let worker_id = Uuid::new_v4();
1026
1027        let job = JobRecord::new("export", serde_json::json!({}), JobPriority::Normal, 3);
1028        let job_id = queue.enqueue(job).await.expect("enqueue");
1029        queue.claim(worker_id, &[], 1).await.expect("claim");
1030        queue.start(job_id).await.expect("start");
1031
1032        queue
1033            .update_progress(job_id, 50, "Processing...")
1034            .await
1035            .expect("progress");
1036        queue
1037            .update_progress(job_id, 100, "Done")
1038            .await
1039            .expect("progress");
1040
1041        // Verify via direct query (using runtime query since .sqlx/ lacks metadata for ad-hoc queries)
1042        let row: (Option<i32>, Option<String>) = sqlx::query_as(
1043            "SELECT progress_percent, progress_message FROM forge_jobs WHERE id = $1",
1044        )
1045        .bind(job_id)
1046        .fetch_one(db.pool())
1047        .await
1048        .expect("query");
1049        assert_eq!(row.0, Some(100));
1050        assert_eq!(row.1.as_deref(), Some("Done"));
1051
1052        db.cleanup().await.expect("cleanup");
1053    }
1054
1055    #[tokio::test]
1056    async fn queue_stats_accurate() {
1057        let db = setup_db("stats").await;
1058        let queue = JobQueue::new(db.pool().clone());
1059
1060        // Start empty
1061        let stats = queue.stats().await.expect("stats");
1062        assert_eq!(stats.pending, 0);
1063
1064        // Add 3 pending
1065        for _ in 0..3 {
1066            let job = JobRecord::new("task", serde_json::json!({}), JobPriority::Normal, 3);
1067            queue.enqueue(job).await.expect("enqueue");
1068        }
1069
1070        let stats = queue.stats().await.expect("stats");
1071        assert_eq!(stats.pending, 3);
1072        assert_eq!(stats.running, 0);
1073        assert_eq!(stats.completed, 0);
1074
1075        db.cleanup().await.expect("cleanup");
1076    }
1077}