mod test_utils;
use hammerwork::Job;
use serde_json::json;
#[cfg(feature = "postgres")]
mod postgres_tests {
use super::*;
use hammerwork::queue::DatabaseQueue;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
#[ignore] async fn test_postgres_full_workflow() {
let queue = test_utils::setup_postgres_queue().await;
let job = Job::new(
"test_queue".to_string(),
json!({"message": "Hello, World!"}),
);
let job_id = job.id;
queue.enqueue(job).await.unwrap();
let dequeued_job = queue.dequeue("test_queue").await.unwrap();
assert!(dequeued_job.is_some());
let dequeued_job = dequeued_job.unwrap();
assert_eq!(dequeued_job.id, job_id);
assert_eq!(dequeued_job.payload, json!({"message": "Hello, World!"}));
queue.complete_job(job_id).await.unwrap();
let completed_job = queue.get_job(job_id).await.unwrap();
assert!(completed_job.is_some());
let completed_job = completed_job.unwrap();
assert_eq!(completed_job.status, hammerwork::JobStatus::Completed);
queue.delete_job(job_id).await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_postgres_retry_workflow() {
let queue = test_utils::setup_postgres_queue().await;
let job = Job::new("retry_queue".to_string(), json!({"will_fail": true}));
let job_id = job.id;
queue.enqueue(job).await.unwrap();
let dequeued_job = queue.dequeue("retry_queue").await.unwrap().unwrap();
queue
.fail_job(dequeued_job.id, "Intentional failure")
.await
.unwrap();
let failed_job = queue.get_job(job_id).await.unwrap().unwrap();
assert_eq!(failed_job.status, hammerwork::JobStatus::Failed);
assert_eq!(
failed_job.error_message,
Some("Intentional failure".to_string())
);
queue.delete_job(job_id).await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_postgres_delayed_job() {
let queue = test_utils::setup_postgres_queue().await;
let delay = chrono::Duration::seconds(1);
let job = Job::with_delay("delayed_queue".to_string(), json!({"delayed": true}), delay);
let job_id = job.id;
queue.enqueue(job).await.unwrap();
let immediate_dequeue = queue.dequeue("delayed_queue").await.unwrap();
assert!(immediate_dequeue.is_none());
sleep(Duration::from_secs(2)).await;
let delayed_dequeue = queue.dequeue("delayed_queue").await.unwrap();
assert!(delayed_dequeue.is_some());
let dequeued_job = delayed_dequeue.unwrap();
assert_eq!(dequeued_job.id, job_id);
queue.complete_job(job_id).await.unwrap();
queue.delete_job(job_id).await.unwrap();
}
}
#[cfg(feature = "mysql")]
mod mysql_tests {
use super::*;
use hammerwork::queue::DatabaseQueue;
#[tokio::test]
#[ignore] async fn test_mysql_full_workflow() {
let queue = test_utils::setup_mysql_queue().await;
let job = Job::new(
"test_queue".to_string(),
json!({"message": "Hello, MySQL!"}),
);
let job_id = job.id;
queue.enqueue(job).await.unwrap();
let dequeued_job = queue.dequeue("test_queue").await.unwrap();
assert!(dequeued_job.is_some());
let dequeued_job = dequeued_job.unwrap();
assert_eq!(dequeued_job.id, job_id);
assert_eq!(dequeued_job.payload, json!({"message": "Hello, MySQL!"}));
queue.complete_job(job_id).await.unwrap();
let completed_job = queue.get_job(job_id).await.unwrap();
assert!(completed_job.is_some());
let completed_job = completed_job.unwrap();
assert_eq!(completed_job.status, hammerwork::JobStatus::Completed);
queue.delete_job(job_id).await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_mysql_concurrent_dequeue() {
let queue = test_utils::setup_mysql_queue().await;
for i in 0..5 {
let job = Job::new("concurrent_queue".to_string(), json!({"index": i}));
queue.enqueue(job).await.unwrap();
}
let mut handles = Vec::new();
for _ in 0..3 {
let queue_clone = queue.clone();
let handle = tokio::spawn(async move { queue_clone.dequeue("concurrent_queue").await });
handles.push(handle);
}
let mut successful_dequeues = 0;
for handle in handles {
let result = handle.await.unwrap().unwrap();
if result.is_some() {
successful_dequeues += 1;
queue.complete_job(result.unwrap().id).await.unwrap();
}
}
assert!(successful_dequeues > 0);
assert!(successful_dequeues <= 3);
}
}
#[tokio::test]
async fn test_job_creation_patterns() {
let job1 = Job::new("queue1".to_string(), json!({"type": "email"}));
assert_eq!(job1.queue_name, "queue1");
assert_eq!(job1.max_attempts, 3);
let delay = chrono::Duration::minutes(30);
let job2 = Job::with_delay("queue2".to_string(), json!({"type": "notification"}), delay);
assert!(job2.scheduled_at > job2.created_at);
let job3 = Job::new("queue3".to_string(), json!({"type": "report"})).with_max_attempts(5);
assert_eq!(job3.max_attempts, 5);
let job4 = Job::with_delay("queue4".to_string(), json!({"type": "cleanup"}), delay)
.with_max_attempts(10);
assert_eq!(job4.max_attempts, 10);
assert!(job4.scheduled_at > job4.created_at);
}
#[tokio::test]
async fn test_job_status_transitions() {
let mut job = Job::new("status_test".to_string(), json!({"data": "test"}));
assert_eq!(job.status, hammerwork::JobStatus::Pending);
assert_eq!(job.attempts, 0);
job.status = hammerwork::JobStatus::Running;
job.attempts = 1;
job.started_at = Some(chrono::Utc::now());
assert_eq!(job.status, hammerwork::JobStatus::Running);
assert_eq!(job.attempts, 1);
assert!(job.started_at.is_some());
job.status = hammerwork::JobStatus::Completed;
job.completed_at = Some(chrono::Utc::now());
assert_eq!(job.status, hammerwork::JobStatus::Completed);
assert!(job.completed_at.is_some());
}
#[tokio::test]
async fn test_error_types() {
use hammerwork::HammerworkError;
let worker_error = HammerworkError::Worker {
message: "Worker crashed".to_string(),
};
assert_eq!(worker_error.to_string(), "Worker error: Worker crashed");
let queue_error = HammerworkError::Queue {
message: "Connection failed".to_string(),
};
assert_eq!(queue_error.to_string(), "Queue error: Connection failed");
let json_error = serde_json::from_str::<serde_json::Value>("invalid json");
assert!(json_error.is_err());
let hammerwork_error: HammerworkError = json_error.unwrap_err().into();
assert!(matches!(
hammerwork_error,
HammerworkError::Serialization(_)
));
}