mod setup;
use serde_json::json;
use setup::*;
use simple_queue::prelude::*;
#[tokio::test]
async fn test_insert_job() {
let ctx = TestContext::new().await;
let queue = spawn_queue(&ctx.pool).await;
let job = Job::new("test_queue", json!({"key": "value"}));
let result = queue.insert_job(job).await.unwrap();
assert!(result.is_some());
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM job_queue WHERE id = $1")
.bind(result.unwrap())
.fetch_one(&ctx.pool)
.await
.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
async fn test_insert_job_with_unique_key() {
let ctx = TestContext::new().await;
let queue = spawn_queue(&ctx.pool).await;
let job = Job::new("test_queue", json!({"key": "value"})).with_unique_key("unique_key_123");
let result = queue.insert_job(job).await.unwrap();
assert!(result.is_some());
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM job_queue WHERE unique_key = $1")
.bind("unique_key_123")
.fetch_one(&ctx.pool)
.await
.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
async fn test_insert_job_duplicate_unique_key() {
let ctx = TestContext::new().await;
let queue = spawn_queue(&ctx.pool).await;
let job1 = Job::new("test_queue", json!({"key": "value1"})).with_unique_key("unique_key_123");
let result1 = queue.insert_job(job1).await.unwrap();
assert!(result1.is_some());
let job2 = Job::new("test_queue", json!({"key": "value2"})).with_unique_key("unique_key_123");
let result2 = queue.insert_job(job2).await.unwrap();
assert!(result2.is_none());
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM job_queue WHERE unique_key = $1")
.bind("unique_key_123")
.fetch_one(&ctx.pool)
.await
.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
async fn test_insert_job_duplicate_unique_key_but_cancelled() {
let ctx = TestContext::new().await;
let queue = spawn_queue(&ctx.pool).await;
let job1 = Job::new("test_queue", json!({"key": "value1"})).with_unique_key("unique_key_123");
queue.insert_job(job1).await.unwrap();
queue
.cancel_job_by_unique_key("unique_key_123")
.await
.unwrap();
let job2 = Job::new("test_queue", json!({"key": "value2"})).with_unique_key("unique_key_123");
let result = queue.insert_job(job2).await.unwrap();
assert!(result.is_some());
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM job_queue WHERE unique_key = $1")
.bind("unique_key_123")
.fetch_one(&ctx.pool)
.await
.unwrap();
assert_eq!(count, 2);
}
#[tokio::test]
async fn test_cancel_job_by_unique_key() {
let ctx = TestContext::new().await;
let queue = spawn_queue(&ctx.pool).await;
let job = Job::new("test_queue", json!({"key": "value"}))
.with_unique_key("unique_key_123")
.with_run_at(chrono::Utc::now() + chrono::Duration::seconds(1));
queue.insert_job(job).await.unwrap();
let status: Option<String> =
sqlx::query_scalar("SELECT status FROM job_queue WHERE unique_key = $1")
.bind("unique_key_123")
.fetch_optional(&ctx.pool)
.await
.unwrap();
assert_eq!(status, Some("pending".to_string()));
queue
.cancel_job_by_unique_key("unique_key_123")
.await
.unwrap();
let status: Option<String> =
sqlx::query_scalar("SELECT status FROM job_queue WHERE unique_key = $1")
.bind("unique_key_123")
.fetch_optional(&ctx.pool)
.await
.unwrap();
assert_eq!(status, Some("cancelled".to_string()));
}
#[tokio::test]
async fn test_cancel_all_jobs_by_fingerprint() {
let ctx = TestContext::new().await;
let queue = spawn_queue(&ctx.pool).await;
let set_job = |job: Job| {
job.with_fingerprint("fingerprint_123")
.with_run_at(chrono::Utc::now() + chrono::Duration::seconds(1))
};
let job1 = set_job(Job::new("test_queue", json!({"key": "value1"})));
let job2 = set_job(Job::new("test_queue", json!({"key": "value2"})));
queue.insert_job(job1).await.unwrap();
queue.insert_job(job2).await.unwrap();
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM job_queue WHERE fingerprint = $1 AND status = 'pending'",
)
.bind("fingerprint_123")
.fetch_one(&ctx.pool)
.await
.unwrap();
assert_eq!(count, 2);
queue
.cancel_all_jobs_by_fingerprint("fingerprint_123")
.await
.unwrap();
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM job_queue WHERE fingerprint = $1 AND status = 'cancelled'",
)
.bind("fingerprint_123")
.fetch_one(&ctx.pool)
.await
.unwrap();
assert_eq!(count, 2);
}
#[tokio::test]
async fn test_insert_jobs() {
let ctx = TestContext::new().await;
let queue = spawn_queue(&ctx.pool).await;
let jobs = vec![
Job::new("test_queue", json!({"key": "value1"})),
Job::new("test_queue", json!({"key": "value2"})),
];
let result = queue.insert_jobs(jobs).await.unwrap();
assert_eq!(result.len(), 2);
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM job_queue WHERE queue = $1")
.bind("test_queue")
.fetch_one(&ctx.pool)
.await
.unwrap();
assert_eq!(count, 2);
}
#[tokio::test]
async fn test_insert_jobs_conflicting() {
let ctx = TestContext::new().await;
let queue = spawn_queue(&ctx.pool).await;
let jobs = vec![
Job::new("test_queue_conflicting", json!({"key": "value1"})).with_unique_key("key"),
Job::new("test_queue_conflicting", json!({"key": "value2"})).with_unique_key("key"),
];
let result = queue.insert_jobs(jobs).await;
assert!(result.is_err());
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM job_queue")
.fetch_one(&ctx.pool)
.await
.unwrap();
assert_eq!(count, 0);
}