use queue_workers::{
worker::{Worker, WorkerConfig},
};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::Mutex;
mod common;
use common::{RetryCondition, TestJob, TestQueue};
#[tokio::test]
async fn test_worker_job_suceeds_without_retries() {
let attempts = Arc::new(Mutex::new(0));
let completion_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new()
.with_attempts(attempts.clone())
.with_execution_complete_notifier(completion_notifier.clone());
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_millis(50),
shutdown_timeout: Duration::from_secs(1),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
completion_notifier.notified().await;
shutdown_tx.send(()).unwrap();
});
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let final_attempts = *attempts.lock().await;
assert_eq!(
final_attempts, 1,
"Job should only be attempted once with RetryCondition::Never"
);
}
#[tokio::test]
async fn test_worker_job_retries_until_it_fails() {
let attempts = Arc::new(Mutex::new(0));
let retry_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new()
.with_attempts(attempts.clone())
.with_should_fail(true)
.with_retry_conditions(RetryCondition::Always)
.with_before_retry_notifier(retry_notifier.clone());
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_millis(1),
shutdown_timeout: Duration::from_secs(1),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
for _ in 0..3 {
retry_notifier.notified().await;
}
shutdown_tx.send(()).unwrap();
});
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let final_attempts = *attempts.lock().await;
assert_eq!(
final_attempts, 4,
"Job should be attempted 4 times (initial + 3 retries)"
);
}
#[tokio::test]
async fn test_worker_job_retries_once() {
let attempts = Arc::new(Mutex::new(0));
let retry_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new()
.with_attempts(attempts.clone())
.with_should_fail(true)
.with_retry_conditions(RetryCondition::OnlyOnAttempt(1))
.with_before_retry_notifier(retry_notifier.clone());
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_millis(1),
shutdown_timeout: Duration::from_secs(1),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
retry_notifier.notified().await;
shutdown_tx.send(()).unwrap();
});
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let final_attempts = *attempts.lock().await;
assert_eq!(final_attempts, 2, "Job should only be attempted twice");
}
#[tokio::test]
async fn test_worker_job_retries_twice() {
let attempts = Arc::new(Mutex::new(0));
let retry_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new()
.with_attempts(attempts.clone())
.with_should_fail(true)
.with_retry_conditions(RetryCondition::UntilAttempt(2))
.with_before_retry_notifier(retry_notifier.clone());
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let config = WorkerConfig {
retry_attempts: 5, retry_delay: Duration::from_millis(50),
shutdown_timeout: Duration::from_secs(1),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
for _ in 0..2 {
retry_notifier.notified().await;
}
shutdown_tx.send(()).unwrap();
});
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let final_attempts = *attempts.lock().await;
assert_eq!(
final_attempts, 3,
"Job should be attempted 3 times (initial + 2 retries)"
);
}
#[tokio::test]
async fn test_worker_job_respects_worker_config_retry_limit() {
let attempts = Arc::new(Mutex::new(0));
let retry_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new()
.with_attempts(attempts.clone())
.with_should_fail(true)
.with_retry_conditions(RetryCondition::Always)
.with_before_retry_notifier(retry_notifier.clone());
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let retry_attempts = 2;
let config = WorkerConfig {
retry_attempts,
retry_delay: Duration::from_millis(1),
shutdown_timeout: Duration::from_secs(1),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
for _ in 0..retry_attempts {
retry_notifier.notified().await;
}
shutdown_tx.send(()).unwrap();
});
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let final_attempts = *attempts.lock().await;
assert_eq!(
final_attempts,
retry_attempts + 1,
"Job should be attempted 3 times (initial + 2 retries) despite Always retry condition"
);
}
#[tokio::test]
async fn test_worker_completes_job_during_shutdown() {
let completion_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new().with_execution_complete_notifier(completion_notifier.clone());
let attempts = job.attempts.clone();
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let config = WorkerConfig {
retry_attempts: 1,
retry_delay: Duration::from_millis(1),
shutdown_timeout: Duration::from_secs(1),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
shutdown_tx.send(()).unwrap();
});
worker
.start(async move {
match shutdown_rx.recv().await {
Ok(_) => {} Err(tokio::sync::broadcast::error::RecvError::Closed) => {} Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} }
})
.await
.unwrap();
let completed =
tokio::time::timeout(Duration::from_millis(100), completion_notifier.notified())
.await
.is_ok();
assert!(
completed,
"Job should have completed during shutdown grace period"
);
let final_attempts = *attempts.lock().await;
assert_eq!(final_attempts, 1, "Job should have been attempted once");
}
#[tokio::test]
async fn test_worker_leaves_jobs_in_queue_on_shutdown() {
let jobs = vec![
TestJob::new().with_duration(Duration::from_secs(1)),
TestJob::new().with_duration(Duration::from_secs(1)),
TestJob::new().with_duration(Duration::from_secs(1)),
TestJob::new().with_duration(Duration::from_secs(1)),
];
let queue = TestQueue {
jobs: Arc::new(Mutex::new(jobs)),
};
let config = WorkerConfig {
retry_attempts: 1,
retry_delay: Duration::from_millis(50),
shutdown_timeout: Duration::from_millis(100), };
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue.clone(), config);
tokio::spawn(async move {
shutdown_tx.send(()).unwrap();
});
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let remaining_jobs = queue.jobs.lock().await.len();
assert!(
remaining_jobs > 0,
"Jobs should remain in queue after immediate shutdown"
);
}
#[tokio::test]
async fn test_worker_shutdown_during_job_retry_delay() {
let attempts = Arc::new(Mutex::new(0));
let before_retry_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new()
.with_attempts(attempts.clone())
.with_should_fail(true)
.with_retry_conditions(RetryCondition::Always)
.with_duration(Duration::from_millis(50))
.with_before_retry_notifier(before_retry_notifier.clone());
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_secs(1), shutdown_timeout: Duration::from_millis(100),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
before_retry_notifier.notified().await;
shutdown_tx.send(()).unwrap();
});
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let final_attempts = *attempts.lock().await;
assert_eq!(
final_attempts, 1,
"Job should not retry when shutdown occurs during retry delay"
);
}
#[tokio::test]
async fn test_worker_shutdown_with_empty_queue() {
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![])),
};
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_millis(50),
shutdown_timeout: Duration::from_millis(500),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
let (started_tx, started_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let _ = started_rx.await;
shutdown_tx.send(()).unwrap();
});
let start = std::time::Instant::now();
let worker_future = worker.start(async move {
let _ = shutdown_rx.recv().await;
});
let _ = started_tx.send(());
worker_future.await.unwrap();
let shutdown_duration = start.elapsed();
assert!(
shutdown_duration < Duration::from_secs(1),
"Worker should shut down quickly with empty queue"
);
}
#[tokio::test]
async fn test_worker_shutdown_signal_channel_closed() {
let attempts = Arc::new(Mutex::new(0));
let completed = Arc::new(AtomicBool::new(false));
let execution_complete_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new()
.with_attempts(attempts.clone())
.with_duration(Duration::from_millis(100)) .with_completion_flag(completed.clone())
.with_execution_complete_notifier(execution_complete_notifier.clone());
let jobs = Arc::new(Mutex::new(vec![job]));
let started_execution = jobs.lock().await.get(0).unwrap().started_execution.clone();
let queue = TestQueue { jobs: jobs.clone() };
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_millis(50),
shutdown_timeout: Duration::from_secs(3),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
while !started_execution.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(1)).await;
}
drop(shutdown_tx);
});
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let completed_in_time = tokio::time::timeout(
Duration::from_millis(200),
execution_complete_notifier.notified(),
)
.await
.is_ok();
assert!(completed_in_time, "Job should have completed in time");
let final_attempts = *attempts.lock().await;
let job_completed = completed.load(Ordering::Relaxed);
assert_eq!(final_attempts, 1, "Job should be attempted exactly once");
assert!(job_completed, "Job should have completed successfully");
}
#[tokio::test]
async fn test_worker_graceful_shutdown_cancels_ongoing_job() {
let job = TestJob::new()
.with_duration(Duration::from_millis(100))
.with_should_fail(false);
let completed = job.completed.clone();
let started_execution = job.started_execution.clone();
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let config = WorkerConfig {
retry_attempts: 1,
retry_delay: Duration::from_millis(5),
shutdown_timeout: Duration::from_millis(50), };
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
while !started_execution.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(1)).await;
}
shutdown_tx.send(()).unwrap();
});
let start = std::time::Instant::now();
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let shutdown_duration = start.elapsed();
assert!(
shutdown_duration >= Duration::from_millis(50),
"Worker should wait for the full shutdown timeout"
);
assert!(
shutdown_duration < Duration::from_millis(100),
"Worker should not wait for the entire job duration"
);
assert!(
!completed.load(Ordering::Relaxed),
"Job should not have completed due to shutdown timeout"
);
}
#[tokio::test]
async fn test_worker_graceful_shutdown_completes_job() {
let completion_notifier = Arc::new(tokio::sync::Notify::new());
let job = TestJob::new()
.with_duration(Duration::from_millis(50))
.with_should_fail(false)
.with_execution_complete_notifier(completion_notifier.clone());
let completed = job.completed.clone();
let started_execution = job.started_execution.clone();
let queue = TestQueue {
jobs: Arc::new(Mutex::new(vec![job])),
};
let config = WorkerConfig {
retry_attempts: 1,
retry_delay: Duration::from_millis(50),
shutdown_timeout: Duration::from_millis(100),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let worker = Worker::new(queue, config);
tokio::spawn(async move {
while !started_execution.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(1)).await;
}
shutdown_tx.send(()).unwrap();
});
let start = std::time::Instant::now();
worker
.start(async move {
let _ = shutdown_rx.recv().await;
})
.await
.unwrap();
let shutdown_duration = start.elapsed();
let completed_in_time =
tokio::time::timeout(Duration::from_millis(100), completion_notifier.notified())
.await
.is_ok();
assert!(completed_in_time, "Job should have completed in time");
assert!(
completed.load(Ordering::Relaxed),
"Job should have completed during graceful shutdown"
);
assert!(
shutdown_duration >= Duration::from_millis(50),
"Worker should have waited for job completion"
);
assert!(
shutdown_duration < Duration::from_millis(100),
"Worker should have finished before shutdown timeout"
);
}