#![cfg(feature = "worker")]
mod common;
use std::sync::Arc;
use std::time::{Duration, Instant};
use rust_job_queue_api_worker_system::{
queue,
worker::{SimulatedExecutor, WorkerRuntime},
JobKind, JobStatus, NewJob,
};
use serde_json::json;
use tokio_util::sync::CancellationToken;
#[tokio::test]
async fn worker_processes_queued_job_to_succeeded() {
let pool = common::fresh_pool().await;
let job = queue::enqueue(
&pool,
NewJob {
kind: JobKind::SummarizeText,
payload: json!({"text": "lorem ipsum"}),
max_attempts: Some(3),
idempotency_key: None,
},
)
.await
.expect("enqueue");
let id = job.job().id;
let cancel = CancellationToken::new();
let runtime = WorkerRuntime::new(pool.clone(), Arc::new(SimulatedExecutor))
.with_concurrency(1)
.with_poll_interval(Duration::from_millis(20), Duration::from_millis(100));
let cancel_for_runtime = cancel.clone();
let task = tokio::spawn(async move { runtime.run(cancel_for_runtime).await });
let deadline = Instant::now() + Duration::from_secs(15);
let final_status = loop {
let cur = queue::get(&pool, id).await.expect("get").expect("present");
if cur.status == JobStatus::Succeeded || cur.status == JobStatus::FailedPermanent {
break cur.status;
}
if Instant::now() > deadline {
panic!("timeout: status={:?} attempts={}", cur.status, cur.attempts);
}
tokio::time::sleep(Duration::from_millis(80)).await;
};
cancel.cancel();
let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
assert_eq!(final_status, JobStatus::Succeeded);
let after = queue::get(&pool, id).await.unwrap().unwrap();
assert!(after.locked_by.is_none());
assert!(after.locked_at.is_none());
}