use hammerwork::queue::DatabaseQueue;
use hammerwork::stats::{InMemoryStatsCollector, StatisticsCollector};
use hammerwork::{HammerworkError, Job, JobQueue, JobStatus, Result, Worker, WorkerPool};
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{sleep, timeout};
use tracing::{error, info};
#[derive(Clone)]
pub struct TestScenarios<DB>
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
pub queue: Arc<JobQueue<DB>>,
}
impl<DB> TestScenarios<DB>
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
pub fn new(queue: Arc<JobQueue<DB>>) -> Self {
Self { queue }
}
pub async fn test_basic_job_lifecycle(&self) -> Result<()> {
info!("๐งช Testing basic job lifecycle");
let job = Job::new(
"test_basic".to_string(),
json!({
"message": "Basic lifecycle test",
"timestamp": chrono::Utc::now().timestamp()
}),
);
let job_id = job.id;
self.queue.enqueue(job).await?;
info!("โ
Job enqueued: {}", job_id);
let dequeued_job = self.queue.dequeue("test_basic").await?;
assert!(dequeued_job.is_some(), "Job should be dequeued");
let dequeued_job = dequeued_job.unwrap();
assert_eq!(dequeued_job.id, job_id);
info!("โ
Job dequeued: {}", job_id);
self.queue.complete_job(job_id).await?;
info!("โ
Job completed: {}", job_id);
let completed_job = self.queue.get_job(job_id).await?;
assert!(completed_job.is_some());
let completed_job = completed_job.unwrap();
assert_eq!(completed_job.status, hammerwork::JobStatus::Completed);
info!("โ
Job status verified as completed");
self.queue.delete_job(job_id).await?;
info!("โ
Basic job lifecycle test completed");
Ok(())
}
pub async fn test_delayed_jobs(&self) -> Result<()> {
info!("๐งช Testing delayed job scheduling");
let delay = chrono::Duration::seconds(2);
let job = Job::with_delay(
"test_delayed".to_string(),
json!({
"message": "Delayed job test",
"delay_seconds": 2
}),
delay,
);
let job_id = job.id;
self.queue.enqueue(job).await?;
info!("โ
Delayed job enqueued: {}", job_id);
let immediate_result = self.queue.dequeue("test_delayed").await?;
assert!(
immediate_result.is_none(),
"Delayed job should not be available immediately"
);
info!("โ
Delayed job correctly not available immediately");
sleep(Duration::from_secs(3)).await;
let delayed_result = self.queue.dequeue("test_delayed").await?;
assert!(
delayed_result.is_some(),
"Delayed job should be available after delay"
);
let delayed_job = delayed_result.unwrap();
assert_eq!(delayed_job.id, job_id);
info!("โ
Delayed job available after delay period");
self.queue.complete_job(job_id).await?;
self.queue.delete_job(job_id).await?;
info!("โ
Delayed job test completed");
Ok(())
}
pub async fn test_job_retries(&self) -> Result<()> {
info!("๐งช Testing job retry mechanism");
let job = Job::new(
"test_retry".to_string(),
json!({
"message": "Retry test job",
"should_fail": true
}),
)
.with_max_attempts(3);
let job_id = job.id;
self.queue.enqueue(job).await?;
info!("โ
Retry test job enqueued: {}", job_id);
let dequeued_job = self.queue.dequeue("test_retry").await?;
assert!(dequeued_job.is_some());
let dequeued_job = dequeued_job.unwrap();
assert_eq!(dequeued_job.attempts, 1);
self.queue
.fail_job(job_id, "Simulated failure for retry test")
.await?;
info!("โ
Job failed with error message");
let failed_job = self.queue.get_job(job_id).await?;
assert!(failed_job.is_some());
let failed_job = failed_job.unwrap();
assert_eq!(failed_job.status, hammerwork::JobStatus::Failed);
assert!(failed_job.error_message.is_some());
info!("โ
Job status verified as failed with error message");
self.queue.delete_job(job_id).await?;
info!("โ
Job retry test completed");
Ok(())
}
pub async fn test_worker_pool(&self) -> Result<()> {
info!("๐งช Testing worker pool functionality");
let job_counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
let counter_clone = job_counter.clone();
let handler = Arc::new(move |job: Job| {
let counter = counter_clone.clone();
Box::pin(async move {
info!("Processing job: {} with payload: {}", job.id, job.payload);
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
sleep(Duration::from_millis(100)).await;
Ok(()) as Result<()>
})
as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
});
let worker1 = Worker::new(
self.queue.clone(),
"worker_test".to_string(),
handler.clone(),
)
.with_poll_interval(Duration::from_millis(100))
.with_max_retries(2);
let worker2 = Worker::new(
self.queue.clone(),
"worker_test".to_string(),
handler.clone(),
)
.with_poll_interval(Duration::from_millis(100))
.with_max_retries(2);
for i in 0..5 {
let job = Job::new(
"worker_test".to_string(),
json!({
"job_number": i,
"message": format!("Worker test job {}", i)
}),
);
self.queue.enqueue(job).await?;
}
info!("โ
Enqueued 5 test jobs for worker pool");
let mut pool = WorkerPool::new();
pool.add_worker(worker1);
pool.add_worker(worker2);
let pool_task = tokio::spawn(async move { pool.start().await });
sleep(Duration::from_secs(2)).await;
let processed_count = job_counter.load(std::sync::atomic::Ordering::SeqCst);
info!("โ
Worker pool processed {} jobs", processed_count);
assert!(
processed_count > 0,
"Worker pool should have processed at least one job"
);
pool_task.abort();
info!("โ
Worker pool test completed");
Ok(())
}
pub async fn test_concurrent_processing(&self) -> Result<()> {
info!("๐งช Testing concurrent job processing");
let mut job_ids = Vec::new();
for i in 0..10 {
let job = Job::new(
"concurrent_test".to_string(),
json!({
"index": i,
"message": format!("Concurrent job {}", i)
}),
);
job_ids.push(job.id);
self.queue.enqueue(job).await?;
}
info!("โ
Enqueued 10 jobs for concurrent processing test");
let mut handles = Vec::new();
for _ in 0..3 {
let queue_clone = self.queue.clone();
let handle = tokio::spawn(async move {
if let Ok(Some(job)) = queue_clone.dequeue("concurrent_test").await {
sleep(Duration::from_millis(50)).await;
let _ = queue_clone.complete_job(job.id).await;
return Some(job.id);
}
None
});
handles.push(handle);
}
let mut processed_jobs = 0;
for handle in handles {
if let Ok(Some(_job_id)) = handle.await {
processed_jobs += 1;
}
}
info!("โ
Concurrently processed {} jobs", processed_jobs);
assert!(
processed_jobs > 0,
"Should have processed at least one job concurrently"
);
for job_id in job_ids {
let _ = self.queue.delete_job(job_id).await;
}
info!("โ
Concurrent processing test completed");
Ok(())
}
pub async fn test_error_handling(&self) -> Result<()> {
info!("๐งช Testing error handling and edge cases");
let empty_result = self.queue.dequeue("nonexistent_queue").await?;
assert!(empty_result.is_none(), "Empty queue should return None");
info!("โ
Empty queue correctly returns None");
let fake_id = uuid::Uuid::new_v4();
let nonexistent_job = self.queue.get_job(fake_id).await?;
assert!(
nonexistent_job.is_none(),
"Non-existent job should return None"
);
info!("โ
Non-existent job correctly returns None");
let delete_result = self.queue.delete_job(fake_id).await;
assert!(
delete_result.is_ok(),
"Deleting non-existent job should not error"
);
info!("โ
Deleting non-existent job handled gracefully");
info!("โ
Error handling test completed");
Ok(())
}
pub async fn test_dead_job_management(&self) -> Result<()> {
info!("๐งช Testing dead job management");
let job = Job::new(
"dead_job_test".to_string(),
json!({
"message": "This job will become dead",
"should_fail": true
}),
)
.with_max_attempts(2);
let job_id = job.id;
self.queue.enqueue(job).await?;
info!("โ
Dead job test job enqueued: {}", job_id);
for attempt in 1..=2 {
let dequeued_job = self.queue.dequeue("dead_job_test").await?;
assert!(dequeued_job.is_some());
let dequeued_job = dequeued_job.unwrap();
assert_eq!(dequeued_job.attempts, attempt);
if attempt == 2 {
self.queue
.mark_job_dead(job_id, "Exhausted all retries")
.await?;
info!("โ
Job marked as dead after {} attempts", attempt);
} else {
self.queue
.fail_job(job_id, &format!("Simulated failure attempt {}", attempt))
.await?;
let retry_at = chrono::Utc::now() + chrono::Duration::seconds(1);
self.queue.retry_job(job_id, retry_at).await?;
info!("โ
Job failed and scheduled for retry, attempt {}", attempt);
sleep(Duration::from_secs(2)).await;
}
}
let dead_job = self.queue.get_job(job_id).await?;
assert!(dead_job.is_some());
let dead_job = dead_job.unwrap();
assert_eq!(dead_job.status, JobStatus::Dead);
assert!(dead_job.failed_at.is_some());
info!("โ
Job status verified as dead");
let dead_jobs = self.queue.get_dead_jobs(Some(10), None).await?;
assert!(!dead_jobs.is_empty());
let found_dead_job = dead_jobs.iter().find(|j| j.id == job_id);
assert!(found_dead_job.is_some());
info!("โ
Dead job found in dead jobs list");
let dead_summary = self.queue.get_dead_job_summary().await?;
assert!(dead_summary.total_dead_jobs > 0);
assert!(
dead_summary
.dead_jobs_by_queue
.contains_key("dead_job_test")
);
info!("โ
Dead job summary contains expected data");
self.queue.retry_dead_job(job_id).await?;
let retried_job = self.queue.get_job(job_id).await?;
assert!(retried_job.is_some());
let retried_job = retried_job.unwrap();
assert_eq!(retried_job.status, JobStatus::Pending);
assert_eq!(retried_job.attempts, 0);
assert!(retried_job.failed_at.is_none());
info!("โ
Dead job successfully retried");
self.queue.delete_job(job_id).await?;
info!("โ
Dead job management test completed");
Ok(())
}
pub async fn test_statistics_collection(&self) -> Result<()> {
info!("๐งช Testing statistics collection");
let stats_collector =
Arc::new(InMemoryStatsCollector::new_default()) as Arc<dyn StatisticsCollector>;
let stats_clone = Arc::clone(&stats_collector);
let handler = Arc::new(move |job: Job| {
let _stats = stats_clone.clone();
Box::pin(async move {
info!("Processing statistics test job: {}", job.id);
sleep(Duration::from_millis(100)).await;
if job
.payload
.get("should_fail")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
return Err(HammerworkError::Worker {
message: "Simulated failure for statistics test".to_string(),
});
}
Ok(())
})
as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
});
let worker = Worker::new(self.queue.clone(), "stats_test".to_string(), handler)
.with_poll_interval(Duration::from_millis(100))
.with_max_retries(2)
.with_stats_collector(Arc::clone(&stats_collector));
let mut job_ids = Vec::new();
for i in 0..5 {
let job = Job::new(
"stats_test".to_string(),
json!({
"index": i,
"should_fail": i % 3 == 0 }),
);
job_ids.push(job.id);
self.queue.enqueue(job).await?;
}
info!("โ
Enqueued 5 jobs for statistics test");
let mut pool = WorkerPool::new().with_stats_collector(Arc::clone(&stats_collector));
pool.add_worker(worker);
let pool_task = tokio::spawn(async move { pool.start().await });
sleep(Duration::from_secs(3)).await;
pool_task.abort();
let system_stats = stats_collector
.get_system_statistics(Duration::from_secs(300))
.await?;
info!(
"System stats - Total: {}, Completed: {}, Failed: {}",
system_stats.total_processed, system_stats.completed, system_stats.failed
);
assert!(
system_stats.total_processed > 0,
"Should have processed some jobs"
);
let queue_stats = stats_collector
.get_queue_statistics("stats_test", Duration::from_secs(300))
.await?;
assert!(
queue_stats.total_processed > 0,
"Queue should have processed some jobs"
);
info!("โ
Queue-specific statistics collected successfully");
let all_stats = stats_collector
.get_all_statistics(Duration::from_secs(300))
.await?;
assert!(
!all_stats.is_empty(),
"Should have statistics for at least one queue"
);
let stats_test_queue = all_stats.iter().find(|s| s.queue_name == "stats_test");
assert!(
stats_test_queue.is_some(),
"Should have statistics for stats_test queue"
);
info!("โ
All queue statistics collected successfully");
for job_id in job_ids {
let _ = self.queue.delete_job(job_id).await;
}
info!("โ
Statistics collection test completed");
Ok(())
}
pub async fn test_database_queue_stats(&self) -> Result<()> {
info!("๐งช Testing database queue statistics");
let mut job_ids = Vec::new();
for i in 0..3 {
let job = Job::new("email_queue".to_string(), json!({ "email_id": i }));
job_ids.push(job.id);
self.queue.enqueue(job).await?;
}
for i in 0..2 {
let job = Job::new(
"notification_queue".to_string(),
json!({ "notification_id": i }),
);
job_ids.push(job.id);
self.queue.enqueue(job).await?;
}
info!("โ
Enqueued jobs in multiple queues");
if let Some(job) = self.queue.dequeue("email_queue").await? {
self.queue.complete_job(job.id).await?;
}
if let Some(job) = self.queue.dequeue("notification_queue").await? {
self.queue.complete_job(job.id).await?;
}
let email_stats = self.queue.get_queue_stats("email_queue").await?;
assert_eq!(email_stats.queue_name, "email_queue");
assert!(email_stats.pending_count > 0 || email_stats.completed_count > 0);
info!(
"โ
Email queue statistics: pending={}, completed={}",
email_stats.pending_count, email_stats.completed_count
);
let all_queue_stats = self.queue.get_all_queue_stats().await?;
assert!(
all_queue_stats.len() >= 2,
"Should have stats for at least 2 queues"
);
let email_found = all_queue_stats
.iter()
.any(|s| s.queue_name == "email_queue");
let notification_found = all_queue_stats
.iter()
.any(|s| s.queue_name == "notification_queue");
assert!(
email_found && notification_found,
"Should have stats for both queues"
);
info!("โ
All queue statistics retrieved successfully");
let status_counts = self.queue.get_job_counts_by_status("email_queue").await?;
assert!(!status_counts.is_empty(), "Should have status counts");
info!("โ
Job status counts: {:?}", status_counts);
for job_id in job_ids {
let _ = self.queue.delete_job(job_id).await;
}
info!("โ
Database queue statistics test completed");
Ok(())
}
pub async fn run_all_tests(&self) -> Result<()> {
info!("๐ Starting comprehensive integration tests");
let mut passed = 0;
let mut failed = 0;
info!("๐งช Running test: Basic Job Lifecycle");
match timeout(Duration::from_secs(30), self.test_basic_job_lifecycle()).await {
Ok(Ok(())) => {
info!("โ
Basic Job Lifecycle - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Basic Job Lifecycle - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Basic Job Lifecycle - TIMEOUT");
failed += 1;
}
}
info!("๐งช Running test: Delayed Jobs");
match timeout(Duration::from_secs(30), self.test_delayed_jobs()).await {
Ok(Ok(())) => {
info!("โ
Delayed Jobs - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Delayed Jobs - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Delayed Jobs - TIMEOUT");
failed += 1;
}
}
info!("๐งช Running test: Job Retries");
match timeout(Duration::from_secs(30), self.test_job_retries()).await {
Ok(Ok(())) => {
info!("โ
Job Retries - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Job Retries - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Job Retries - TIMEOUT");
failed += 1;
}
}
info!("๐งช Running test: Worker Pool");
match timeout(Duration::from_secs(30), self.test_worker_pool()).await {
Ok(Ok(())) => {
info!("โ
Worker Pool - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Worker Pool - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Worker Pool - TIMEOUT");
failed += 1;
}
}
info!("๐งช Running test: Concurrent Processing");
match timeout(Duration::from_secs(30), self.test_concurrent_processing()).await {
Ok(Ok(())) => {
info!("โ
Concurrent Processing - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Concurrent Processing - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Concurrent Processing - TIMEOUT");
failed += 1;
}
}
info!("๐งช Running test: Error Handling");
match timeout(Duration::from_secs(30), self.test_error_handling()).await {
Ok(Ok(())) => {
info!("โ
Error Handling - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Error Handling - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Error Handling - TIMEOUT");
failed += 1;
}
}
info!("๐งช Running test: Dead Job Management");
match timeout(Duration::from_secs(45), self.test_dead_job_management()).await {
Ok(Ok(())) => {
info!("โ
Dead Job Management - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Dead Job Management - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Dead Job Management - TIMEOUT");
failed += 1;
}
}
info!("๐งช Running test: Statistics Collection");
match timeout(Duration::from_secs(45), self.test_statistics_collection()).await {
Ok(Ok(())) => {
info!("โ
Statistics Collection - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Statistics Collection - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Statistics Collection - TIMEOUT");
failed += 1;
}
}
info!("๐งช Running test: Database Queue Statistics");
match timeout(Duration::from_secs(30), self.test_database_queue_stats()).await {
Ok(Ok(())) => {
info!("โ
Database Queue Statistics - PASSED");
passed += 1;
}
Ok(Err(e)) => {
error!("โ Database Queue Statistics - FAILED: {}", e);
failed += 1;
}
Err(_) => {
error!("โ Database Queue Statistics - TIMEOUT");
failed += 1;
}
}
info!(
"๐ Integration tests completed: {} passed, {} failed",
passed, failed
);
if failed > 0 {
return Err(HammerworkError::Worker {
message: format!("{} tests failed", failed),
});
}
Ok(())
}
}