use std::sync::Arc;
use serde_json::json;
use rustqueue::JobState;
use rustqueue::config::RetentionConfig;
use rustqueue::engine::queue::{JobOptions, QueueManager};
use rustqueue::engine::scheduler::start_scheduler;
use rustqueue::storage::MemoryStorage;
#[tokio::test]
async fn test_scheduler_promotes_delayed_jobs() {
let storage = Arc::new(MemoryStorage::new());
let manager = Arc::new(QueueManager::new(storage));
let opts = JobOptions {
delay_ms: Some(50),
..Default::default()
};
let id = manager
.push("work", "delayed-task", json!({}), Some(opts))
.await
.unwrap();
let job = manager.get_job(id).await.unwrap().unwrap();
assert_eq!(job.state, JobState::Delayed);
let pulled = manager.pull("work", 1).await.unwrap();
assert!(pulled.is_empty(), "Delayed job should not be pullable yet");
let scheduler = start_scheduler(Arc::clone(&manager), 20, 30_000, RetentionConfig::default());
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
let pulled = manager.pull("work", 1).await.unwrap();
assert_eq!(
pulled.len(),
1,
"Scheduler should have promoted the delayed job"
);
assert_eq!(pulled[0].id, id);
assert_eq!(pulled[0].state, JobState::Active);
scheduler.abort();
}
#[tokio::test]
async fn test_scheduler_detects_timed_out_jobs() {
let storage = Arc::new(MemoryStorage::new());
let manager = Arc::new(QueueManager::new(storage));
let opts = JobOptions {
timeout_ms: Some(10),
..Default::default()
};
let id = manager
.push("work", "timeout-task", json!({}), Some(opts))
.await
.unwrap();
manager.pull("work", 1).await.unwrap();
let job = manager.get_job(id).await.unwrap().unwrap();
assert_eq!(job.state, JobState::Active);
let scheduler = start_scheduler(Arc::clone(&manager), 20, 30_000, RetentionConfig::default());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let job = manager.get_job(id).await.unwrap().unwrap();
assert!(
matches!(job.state, JobState::Delayed | JobState::Waiting),
"Timed out job should be retried, got {:?}",
job.state
);
assert_eq!(job.last_error, Some("job timed out".to_string()));
scheduler.abort();
}