rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Integration test: cancel a job while it is mid-execution. The worker
//! must observe the `cancel_requested` flag between sub-steps and transition
//! the job to `cancelled`, not `succeeded`.

#![cfg(feature = "worker")]

mod common;

use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use rust_job_queue_api_worker_system::{
    queue,
    worker::{ExecutionContext, ExecutionOutcome, Executor, WorkerRuntime},
    Job, JobKind, JobStatus, NewJob,
};
use serde_json::json;
use tokio_util::sync::CancellationToken;

/// A long-running executor with explicit cancellation checks between steps,
/// so the test can deterministically observe mid-execution cancellation.
struct LongExecutor;

#[async_trait]
impl Executor for LongExecutor {
    async fn execute(&self, ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome {
        for _step in 0..10 {
            match queue::get(&ctx.pool, job.id).await {
                Ok(Some(j)) if j.cancel_requested => return ExecutionOutcome::Cancelled,
                Ok(_) => {}
                Err(e) => return ExecutionOutcome::Failed(format!("db: {e}")),
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }
        ExecutionOutcome::Succeeded
    }
}

#[tokio::test]
async fn cancel_mid_run_lands_in_cancelled() {
    let pool = common::fresh_pool().await;

    let job = queue::enqueue(
        &pool,
        NewJob {
            kind: JobKind::ResizeImage,
            payload: json!({"source_url": "x", "width": 1, "height": 1}),
            max_attempts: Some(1),
            idempotency_key: None,
        },
    )
    .await
    .unwrap();
    let id = job.job().id;

    let cancel_token = CancellationToken::new();
    let runtime = WorkerRuntime::new(pool.clone(), Arc::new(LongExecutor))
        .with_concurrency(1)
        .with_poll_interval(Duration::from_millis(20), Duration::from_millis(100));
    let token_for_runtime = cancel_token.clone();
    let task = tokio::spawn(async move { runtime.run(token_for_runtime).await });

    // Wait until the job is running.
    let deadline = Instant::now() + Duration::from_secs(5);
    loop {
        let cur = queue::get(&pool, id).await.unwrap().unwrap();
        if cur.status == JobStatus::Running {
            break;
        }
        if Instant::now() > deadline {
            panic!("job never started running: status={:?}", cur.status);
        }
        tokio::time::sleep(Duration::from_millis(20)).await;
    }

    // Cancel mid-flight.
    let outcome = queue::request_cancel(&pool, id).await.unwrap();
    assert_eq!(
        outcome,
        rust_job_queue_api_worker_system::queue::CancelOutcome::PendingOnWorker
    );

    // Wait for terminal cancelled state.
    let deadline = Instant::now() + Duration::from_secs(5);
    loop {
        let cur = queue::get(&pool, id).await.unwrap().unwrap();
        if cur.status == JobStatus::Cancelled {
            break;
        }
        if Instant::now() > deadline {
            panic!(
                "job did not reach cancelled: status={:?} cancel_requested={}",
                cur.status, cur.cancel_requested
            );
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }

    cancel_token.cancel();
    let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
}