#![cfg(feature = "worker")]
mod common;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use rust_job_queue_api_worker_system::{
queue,
worker::{ExecutionContext, ExecutionOutcome, Executor, WorkerRuntime},
Job, JobKind, JobStatus, NewJob,
};
use serde_json::json;
use tokio_util::sync::CancellationToken;
struct LongExecutor;
#[async_trait]
impl Executor for LongExecutor {
async fn execute(&self, ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome {
for _step in 0..10 {
match queue::get(&ctx.pool, job.id).await {
Ok(Some(j)) if j.cancel_requested => return ExecutionOutcome::Cancelled,
Ok(_) => {}
Err(e) => return ExecutionOutcome::Failed(format!("db: {e}")),
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
ExecutionOutcome::Succeeded
}
}
#[tokio::test]
async fn cancel_mid_run_lands_in_cancelled() {
let pool = common::fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::ResizeImage,
payload: json!({"source_url": "x", "width": 1, "height": 1}),
max_attempts: Some(1),
idempotency_key: None,
},
)
.await
.unwrap();
let id = job.job().id;
let cancel_token = CancellationToken::new();
let runtime = WorkerRuntime::new(pool.clone(), Arc::new(LongExecutor))
.with_concurrency(1)
.with_poll_interval(Duration::from_millis(20), Duration::from_millis(100));
let token_for_runtime = cancel_token.clone();
let task = tokio::spawn(async move { runtime.run(token_for_runtime).await });
let deadline = Instant::now() + Duration::from_secs(5);
loop {
let cur = queue::get(&pool, id).await.unwrap().unwrap();
if cur.status == JobStatus::Running {
break;
}
if Instant::now() > deadline {
panic!("job never started running: status={:?}", cur.status);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
let outcome = queue::request_cancel(&pool, id).await.unwrap();
assert_eq!(
outcome,
rust_job_queue_api_worker_system::queue::CancelOutcome::PendingOnWorker
);
let deadline = Instant::now() + Duration::from_secs(5);
loop {
let cur = queue::get(&pool, id).await.unwrap().unwrap();
if cur.status == JobStatus::Cancelled {
break;
}
if Instant::now() > deadline {
panic!(
"job did not reach cancelled: status={:?} cancel_requested={}",
cur.status, cur.cancel_requested
);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
cancel_token.cancel();
let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
}