use chrono::{DateTime, Utc};
use forge_core::job::{JobPriority, JobStatus};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct JobRecord {
pub id: Uuid,
pub job_type: String,
pub input: serde_json::Value,
pub output: Option<serde_json::Value>,
pub job_context: serde_json::Value,
pub status: JobStatus,
pub priority: i32,
pub attempts: i32,
pub max_attempts: i32,
pub last_error: Option<String>,
pub worker_capability: Option<String>,
pub worker_id: Option<Uuid>,
pub idempotency_key: Option<String>,
pub owner_subject: Option<String>,
pub scheduled_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub claimed_at: Option<DateTime<Utc>>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub failed_at: Option<DateTime<Utc>>,
pub last_heartbeat: Option<DateTime<Utc>>,
pub cancel_requested_at: Option<DateTime<Utc>>,
pub cancelled_at: Option<DateTime<Utc>>,
pub cancel_reason: Option<String>,
}
impl JobRecord {
pub fn new(
job_type: impl Into<String>,
input: serde_json::Value,
priority: JobPriority,
max_attempts: i32,
) -> Self {
Self {
id: Uuid::new_v4(),
job_type: job_type.into(),
input,
output: None,
job_context: serde_json::json!({}),
status: JobStatus::Pending,
priority: priority.as_i32(),
attempts: 0,
max_attempts,
last_error: None,
worker_capability: None,
worker_id: None,
idempotency_key: None,
owner_subject: None,
scheduled_at: Utc::now(),
created_at: Utc::now(),
claimed_at: None,
started_at: None,
completed_at: None,
failed_at: None,
last_heartbeat: None,
cancel_requested_at: None,
cancelled_at: None,
cancel_reason: None,
}
}
pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
self.worker_capability = Some(capability.into());
self
}
pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
self.scheduled_at = at;
self
}
pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
self.idempotency_key = Some(key.into());
self
}
pub fn with_owner_subject(mut self, owner_subject: Option<String>) -> Self {
self.owner_subject = owner_subject;
self
}
}
#[derive(Clone)]
pub struct JobQueue {
pool: sqlx::PgPool,
}
impl JobQueue {
pub fn new(pool: sqlx::PgPool) -> Self {
Self { pool }
}
pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
if let Some(ref key) = job.idempotency_key {
let existing = sqlx::query_scalar!(
r#"
SELECT id FROM forge_jobs
WHERE idempotency_key = $1
AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
"#,
key
)
.fetch_optional(&self.pool)
.await?;
if let Some(id) = existing {
return Ok(id); }
}
sqlx::query!(
r#"
INSERT INTO forge_jobs (
id, job_type, input, job_context, status, priority, attempts, max_attempts,
worker_capability, idempotency_key, owner_subject, scheduled_at, created_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
)
"#,
job.id,
&job.job_type,
job.input as _,
job.job_context as _,
job.status.as_str(),
job.priority,
job.attempts,
job.max_attempts,
job.worker_capability as _,
job.idempotency_key as _,
job.owner_subject as _,
job.scheduled_at,
job.created_at,
)
.execute(&self.pool)
.await?;
Ok(job.id)
}
pub async fn claim(
&self,
worker_id: Uuid,
capabilities: &[String],
limit: i32,
) -> Result<Vec<JobRecord>, sqlx::Error> {
let rows = sqlx::query!(
r#"
WITH claimable AS (
SELECT id
FROM forge_jobs
WHERE status = 'pending'
AND scheduled_at <= NOW()
AND (worker_capability = ANY($2) OR worker_capability IS NULL)
ORDER BY priority DESC, scheduled_at ASC
LIMIT $3
FOR UPDATE SKIP LOCKED
)
UPDATE forge_jobs
SET
status = 'claimed',
worker_id = $1,
claimed_at = NOW(),
attempts = attempts + 1
WHERE id IN (SELECT id FROM claimable)
RETURNING
id, job_type, input, output, job_context, status, priority,
attempts, max_attempts, last_error, worker_capability,
worker_id, idempotency_key, owner_subject, scheduled_at, created_at,
claimed_at, started_at, completed_at, failed_at, last_heartbeat,
cancel_requested_at, cancelled_at, cancel_reason
"#,
worker_id,
capabilities,
limit as i64,
)
.fetch_all(&self.pool)
.await?;
let jobs = rows
.into_iter()
.map(|row| JobRecord {
id: row.id,
job_type: row.job_type,
input: row.input,
output: row.output,
job_context: row.job_context,
status: row
.status
.parse()
.unwrap_or(forge_core::job::JobStatus::Failed),
priority: row.priority,
attempts: row.attempts,
max_attempts: row.max_attempts,
last_error: row.last_error,
worker_capability: row.worker_capability,
worker_id: row.worker_id,
idempotency_key: row.idempotency_key,
owner_subject: row.owner_subject,
scheduled_at: row.scheduled_at,
created_at: row.created_at,
claimed_at: row.claimed_at,
started_at: row.started_at,
completed_at: row.completed_at,
failed_at: row.failed_at,
last_heartbeat: row.last_heartbeat,
cancel_requested_at: row.cancel_requested_at,
cancelled_at: row.cancelled_at,
cancel_reason: row.cancel_reason,
})
.collect();
Ok(jobs)
}
pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
let result = sqlx::query!(
r#"
UPDATE forge_jobs
SET status = 'running', started_at = NOW(), last_heartbeat = NOW()
WHERE id = $1
AND status NOT IN ('cancel_requested', 'cancelled')
"#,
job_id,
)
.execute(&self.pool)
.await?;
if result.rows_affected() == 0 {
return Err(sqlx::Error::RowNotFound);
}
Ok(())
}
pub async fn complete(
&self,
job_id: Uuid,
output: serde_json::Value,
ttl: Option<std::time::Duration>,
) -> Result<(), sqlx::Error> {
let expires_at = ttl.map(|d| {
chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
});
sqlx::query!(
r#"
UPDATE forge_jobs
SET
status = 'completed',
output = $2,
completed_at = NOW(),
cancel_requested_at = NULL,
cancelled_at = NULL,
cancel_reason = NULL,
expires_at = $3
WHERE id = $1
"#,
job_id,
output as _,
expires_at,
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn fail(
&self,
job_id: Uuid,
error: &str,
retry_delay: Option<chrono::Duration>,
ttl: Option<std::time::Duration>,
) -> Result<(), sqlx::Error> {
if let Some(delay) = retry_delay {
sqlx::query!(
r#"
UPDATE forge_jobs
SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
last_error = $2,
scheduled_at = NOW() + make_interval(secs => $3),
cancel_requested_at = NULL,
cancelled_at = NULL,
cancel_reason = NULL
WHERE id = $1
"#,
job_id,
error,
delay.num_seconds() as f64,
)
.execute(&self.pool)
.await?;
} else {
let expires_at = ttl.map(|d| {
chrono::Utc::now()
+ chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
});
sqlx::query!(
r#"
UPDATE forge_jobs
SET
status = 'dead_letter',
last_error = $2,
failed_at = NOW(),
cancel_requested_at = NULL,
cancelled_at = NULL,
cancel_reason = NULL,
expires_at = $3
WHERE id = $1
"#,
job_id,
error,
expires_at,
)
.execute(&self.pool)
.await?;
}
Ok(())
}
pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
UPDATE forge_jobs
SET last_heartbeat = NOW()
WHERE id = $1
"#,
job_id,
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn update_progress(
&self,
job_id: Uuid,
percent: i32,
message: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
UPDATE forge_jobs
SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
WHERE id = $1
"#,
job_id,
percent,
message,
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn set_context(
&self,
job_id: Uuid,
context: serde_json::Value,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
UPDATE forge_jobs
SET job_context = $2
WHERE id = $1
"#,
job_id,
context as _,
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn request_cancel(
&self,
job_id: Uuid,
reason: Option<&str>,
caller_subject: Option<&str>,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query!(
"SELECT status, owner_subject FROM forge_jobs WHERE id = $1",
job_id
)
.fetch_optional(&self.pool)
.await?;
let (status, owner_subject) = match row {
Some(r) => (r.status, r.owner_subject),
None => return Ok(false),
};
if let Some(ref owner) = owner_subject {
match caller_subject {
Some(caller) if caller == owner => { }
_ => return Ok(false), }
}
let terminal_statuses = [
JobStatus::Completed.as_str(),
JobStatus::Failed.as_str(),
JobStatus::DeadLetter.as_str(),
JobStatus::Cancelled.as_str(),
];
if status == JobStatus::Running.as_str() {
let updated = sqlx::query!(
r#"
UPDATE forge_jobs
SET
status = 'cancel_requested',
cancel_requested_at = NOW(),
cancel_reason = COALESCE($2, cancel_reason)
WHERE id = $1
AND status = 'running'
"#,
job_id,
reason,
)
.execute(&self.pool)
.await?;
return Ok(updated.rows_affected() > 0);
}
if terminal_statuses.contains(&status.as_str()) {
return Ok(false);
}
let updated = sqlx::query!(
r#"
UPDATE forge_jobs
SET
status = 'cancelled',
cancelled_at = NOW(),
cancel_reason = COALESCE($2, cancel_reason)
WHERE id = $1
AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
"#,
job_id,
reason,
)
.execute(&self.pool)
.await?;
Ok(updated.rows_affected() > 0)
}
pub async fn cancel(
&self,
job_id: Uuid,
reason: Option<&str>,
ttl: Option<std::time::Duration>,
) -> Result<(), sqlx::Error> {
let expires_at = ttl.map(|d| {
chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
});
sqlx::query!(
r#"
UPDATE forge_jobs
SET
status = 'cancelled',
cancelled_at = NOW(),
cancel_reason = COALESCE($2, cancel_reason),
expires_at = $3
WHERE id = $1
"#,
job_id,
reason,
expires_at,
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn release_stale(
&self,
stale_threshold: chrono::Duration,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query!(
r#"
UPDATE forge_jobs
SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
last_heartbeat = NULL
WHERE
(
status = 'claimed'
AND claimed_at < NOW() - make_interval(secs => $1)
)
OR (
status = 'running'
AND COALESCE(last_heartbeat, started_at, claimed_at) < NOW() - make_interval(secs => $1)
)
"#,
stale_threshold.num_seconds() as f64,
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
let result = sqlx::query!(
r#"
DELETE FROM forge_jobs
WHERE expires_at IS NOT NULL
AND expires_at < NOW()
AND status IN ('completed', 'cancelled', 'failed', 'dead_letter')
"#,
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
let row = sqlx::query!(
r#"
SELECT
COUNT(*) FILTER (WHERE status = 'pending') as "pending!",
COUNT(*) FILTER (WHERE status = 'claimed') as "claimed!",
COUNT(*) FILTER (WHERE status = 'running') as "running!",
COUNT(*) FILTER (WHERE status = 'completed') as "completed!",
COUNT(*) FILTER (WHERE status = 'cancelled') as "cancelled!",
COUNT(*) FILTER (WHERE status = 'failed') as "failed!",
COUNT(*) FILTER (WHERE status = 'dead_letter') as "dead_letter!"
FROM forge_jobs
"#,
)
.fetch_one(&self.pool)
.await?;
Ok(QueueStats {
pending: row.pending as u64,
claimed: row.claimed as u64,
running: row.running as u64,
completed: row.completed as u64,
cancelled: row.cancelled as u64,
failed: row.failed as u64,
dead_letter: row.dead_letter as u64,
})
}
}
#[derive(Debug, Clone, Default)]
pub struct QueueStats {
pub pending: u64,
pub claimed: u64,
pub running: u64,
pub completed: u64,
pub cancelled: u64,
pub failed: u64,
pub dead_letter: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_record_creation() {
let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
assert_eq!(job.job_type, "send_email");
assert_eq!(job.status, JobStatus::Pending);
assert_eq!(job.priority, 50);
assert_eq!(job.attempts, 0);
assert_eq!(job.max_attempts, 3);
}
#[test]
fn test_job_record_with_capability() {
let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
.with_capability("media");
assert_eq!(job.worker_capability, Some("media".to_string()));
assert_eq!(job.priority, 75);
}
#[test]
fn test_job_record_with_idempotency() {
let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
.with_idempotency_key("payment-123");
assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
}
#[test]
fn test_job_record_with_owner_subject() {
let job = JobRecord::new("task", serde_json::json!({}), JobPriority::Normal, 3)
.with_owner_subject(Some("user-123".into()));
assert_eq!(job.owner_subject, Some("user-123".to_string()));
}
#[test]
fn test_priority_ordering() {
let bg = JobRecord::new("a", serde_json::json!({}), JobPriority::Background, 1);
let low = JobRecord::new("b", serde_json::json!({}), JobPriority::Low, 1);
let normal = JobRecord::new("c", serde_json::json!({}), JobPriority::Normal, 1);
let high = JobRecord::new("d", serde_json::json!({}), JobPriority::High, 1);
let critical = JobRecord::new("e", serde_json::json!({}), JobPriority::Critical, 1);
assert!(bg.priority < low.priority);
assert!(low.priority < normal.priority);
assert!(normal.priority < high.priority);
assert!(high.priority < critical.priority);
}
}
#[cfg(all(test, feature = "testcontainers"))]
#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
mod integration_tests {
use super::*;
use forge_core::testing::{IsolatedTestDb, TestDatabase};
async fn setup_db(test_name: &str) -> IsolatedTestDb {
let base = TestDatabase::from_env()
.await
.expect("Failed to create test database");
let db = base
.isolated(test_name)
.await
.expect("Failed to create isolated db");
let system_sql = crate::migrations::get_all_system_sql();
db.run_sql(&system_sql)
.await
.expect("Failed to apply system schema");
db
}
#[tokio::test]
async fn enqueue_and_claim_job() {
let db = setup_db("enqueue_and_claim").await;
let queue = JobQueue::new(db.pool().clone());
let worker_id = Uuid::new_v4();
let job = JobRecord::new(
"send_email",
serde_json::json!({"to": "a@b.com"}),
JobPriority::Normal,
3,
);
let job_id = queue.enqueue(job).await.expect("Failed to enqueue");
let claimed = queue
.claim(worker_id, &[], 10)
.await
.expect("Failed to claim");
assert_eq!(claimed.len(), 1);
assert_eq!(claimed[0].id, job_id);
assert_eq!(claimed[0].job_type, "send_email");
assert_eq!(claimed[0].status, JobStatus::Claimed);
assert_eq!(claimed[0].attempts, 1);
assert!(claimed[0].worker_id.is_some());
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn claim_respects_skip_locked() {
let db = setup_db("claim_skip_locked").await;
let queue = JobQueue::new(db.pool().clone());
for i in 0..3 {
let job = JobRecord::new(
format!("job_{i}"),
serde_json::json!({}),
JobPriority::Normal,
3,
);
queue.enqueue(job).await.expect("enqueue");
}
let worker1 = Uuid::new_v4();
let batch1 = queue.claim(worker1, &[], 2).await.expect("claim1");
assert_eq!(batch1.len(), 2);
let worker2 = Uuid::new_v4();
let batch2 = queue.claim(worker2, &[], 2).await.expect("claim2");
assert_eq!(batch2.len(), 1);
let ids1: Vec<Uuid> = batch1.iter().map(|j| j.id).collect();
let ids2: Vec<Uuid> = batch2.iter().map(|j| j.id).collect();
for id in &ids2 {
assert!(
!ids1.contains(id),
"SKIP LOCKED should prevent duplicate claims"
);
}
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn claim_respects_priority_ordering() {
let db = setup_db("claim_priority").await;
let queue = JobQueue::new(db.pool().clone());
let worker_id = Uuid::new_v4();
let low = JobRecord::new("low_job", serde_json::json!({}), JobPriority::Low, 3);
queue.enqueue(low).await.expect("enqueue low");
let high = JobRecord::new("high_job", serde_json::json!({}), JobPriority::Critical, 3);
queue.enqueue(high).await.expect("enqueue high");
let claimed = queue.claim(worker_id, &[], 1).await.expect("claim");
assert_eq!(claimed.len(), 1);
assert_eq!(claimed[0].job_type, "high_job");
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn complete_job_lifecycle() {
let db = setup_db("complete_lifecycle").await;
let queue = JobQueue::new(db.pool().clone());
let worker_id = Uuid::new_v4();
let job = JobRecord::new("process", serde_json::json!({}), JobPriority::Normal, 3);
let job_id = queue.enqueue(job).await.expect("enqueue");
queue.claim(worker_id, &[], 1).await.expect("claim");
queue.start(job_id).await.expect("start");
queue
.complete(job_id, serde_json::json!({"result": "done"}), None)
.await
.expect("complete");
let stats = queue.stats().await.expect("stats");
assert_eq!(stats.completed, 1);
assert_eq!(stats.pending, 0);
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn fail_with_retry_requeues_as_pending() {
let db = setup_db("fail_retry").await;
let queue = JobQueue::new(db.pool().clone());
let worker_id = Uuid::new_v4();
let job = JobRecord::new("flaky", serde_json::json!({}), JobPriority::Normal, 3);
let job_id = queue.enqueue(job).await.expect("enqueue");
queue.claim(worker_id, &[], 1).await.expect("claim");
queue.start(job_id).await.expect("start");
queue
.fail(
job_id,
"transient error",
Some(chrono::Duration::seconds(0)),
None,
)
.await
.expect("fail");
let stats = queue.stats().await.expect("stats");
assert_eq!(stats.pending, 1);
assert_eq!(stats.dead_letter, 0);
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn fail_without_retry_goes_to_dead_letter() {
let db = setup_db("fail_dead_letter").await;
let queue = JobQueue::new(db.pool().clone());
let worker_id = Uuid::new_v4();
let job = JobRecord::new("fatal", serde_json::json!({}), JobPriority::Normal, 1);
let job_id = queue.enqueue(job).await.expect("enqueue");
queue.claim(worker_id, &[], 1).await.expect("claim");
queue.start(job_id).await.expect("start");
queue
.fail(job_id, "permanent error", None, None)
.await
.expect("fail");
let stats = queue.stats().await.expect("stats");
assert_eq!(stats.dead_letter, 1);
assert_eq!(stats.pending, 0);
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn idempotency_key_deduplicates() {
let db = setup_db("idempotency").await;
let queue = JobQueue::new(db.pool().clone());
let job1 = JobRecord::new("pay", serde_json::json!({}), JobPriority::Normal, 3)
.with_idempotency_key("pay-123");
let id1 = queue.enqueue(job1).await.expect("enqueue1");
let job2 = JobRecord::new(
"pay",
serde_json::json!({"amount": 200}),
JobPriority::Normal,
3,
)
.with_idempotency_key("pay-123");
let id2 = queue.enqueue(job2).await.expect("enqueue2");
assert_eq!(id1, id2, "Idempotency key should return same job ID");
let stats = queue.stats().await.expect("stats");
assert_eq!(stats.pending, 1);
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn cancel_pending_job() {
let db = setup_db("cancel_pending").await;
let queue = JobQueue::new(db.pool().clone());
let job = JobRecord::new("task", serde_json::json!({}), JobPriority::Normal, 3);
let job_id = queue.enqueue(job).await.expect("enqueue");
let cancelled = queue
.request_cancel(job_id, Some("no longer needed"), None)
.await
.expect("cancel");
assert!(cancelled);
let stats = queue.stats().await.expect("stats");
assert_eq!(stats.cancelled, 1);
assert_eq!(stats.pending, 0);
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn cancel_respects_ownership() {
let db = setup_db("cancel_ownership").await;
let queue = JobQueue::new(db.pool().clone());
let job = JobRecord::new("task", serde_json::json!({}), JobPriority::Normal, 3)
.with_owner_subject(Some("user-alice".into()));
let job_id = queue.enqueue(job).await.expect("enqueue");
let denied = queue
.request_cancel(job_id, Some("reason"), Some("user-bob"))
.await
.expect("cancel attempt");
assert!(!denied, "Should deny cancellation from non-owner");
let allowed = queue
.request_cancel(job_id, Some("reason"), Some("user-alice"))
.await
.expect("cancel");
assert!(allowed, "Should allow owner to cancel");
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn claim_respects_worker_capability() {
let db = setup_db("claim_capability").await;
let queue = JobQueue::new(db.pool().clone());
let job = JobRecord::new("render", serde_json::json!({}), JobPriority::Normal, 3)
.with_capability("gpu");
queue.enqueue(job).await.expect("enqueue");
let worker_no_cap = Uuid::new_v4();
let claimed = queue
.claim(worker_no_cap, &["cpu".into()], 10)
.await
.expect("claim");
assert!(
claimed.is_empty(),
"Worker without gpu cap should not claim gpu job"
);
let worker_with_cap = Uuid::new_v4();
let claimed = queue
.claim(worker_with_cap, &["gpu".into()], 10)
.await
.expect("claim");
assert_eq!(claimed.len(), 1);
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn heartbeat_updates_timestamp() {
let db = setup_db("heartbeat").await;
let queue = JobQueue::new(db.pool().clone());
let worker_id = Uuid::new_v4();
let job = JobRecord::new("long_task", serde_json::json!({}), JobPriority::Normal, 3);
let job_id = queue.enqueue(job).await.expect("enqueue");
queue.claim(worker_id, &[], 1).await.expect("claim");
queue.start(job_id).await.expect("start");
queue.heartbeat(job_id).await.expect("heartbeat");
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn progress_updates_persist() {
let db = setup_db("progress").await;
let queue = JobQueue::new(db.pool().clone());
let worker_id = Uuid::new_v4();
let job = JobRecord::new("export", serde_json::json!({}), JobPriority::Normal, 3);
let job_id = queue.enqueue(job).await.expect("enqueue");
queue.claim(worker_id, &[], 1).await.expect("claim");
queue.start(job_id).await.expect("start");
queue
.update_progress(job_id, 50, "Processing...")
.await
.expect("progress");
queue
.update_progress(job_id, 100, "Done")
.await
.expect("progress");
let row: (Option<i32>, Option<String>) = sqlx::query_as(
"SELECT progress_percent, progress_message FROM forge_jobs WHERE id = $1",
)
.bind(job_id)
.fetch_one(db.pool())
.await
.expect("query");
assert_eq!(row.0, Some(100));
assert_eq!(row.1.as_deref(), Some("Done"));
db.cleanup().await.expect("cleanup");
}
#[tokio::test]
async fn queue_stats_accurate() {
let db = setup_db("stats").await;
let queue = JobQueue::new(db.pool().clone());
let stats = queue.stats().await.expect("stats");
assert_eq!(stats.pending, 0);
for _ in 0..3 {
let job = JobRecord::new("task", serde_json::json!({}), JobPriority::Normal, 3);
queue.enqueue(job).await.expect("enqueue");
}
let stats = queue.stats().await.expect("stats");
assert_eq!(stats.pending, 3);
assert_eq!(stats.running, 0);
assert_eq!(stats.completed, 0);
db.cleanup().await.expect("cleanup");
}
}