use std::sync::Arc;
use reqwest::Client;
use serde_json::{Value, json};
use rustqueue::api::{self, AppState};
use rustqueue::engine::queue::QueueManager;
use rustqueue::storage::RedbStorage;
async fn start_test_server() -> String {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.redb");
let _keep = Box::leak(Box::new(dir));
let (event_tx, _) = tokio::sync::broadcast::channel(1024);
let storage = Arc::new(RedbStorage::new(&db_path).unwrap());
let qm = Arc::new(QueueManager::new(storage));
let state = Arc::new(AppState {
queue_manager: qm,
start_time: std::time::Instant::now(),
metrics_handle: None,
event_tx,
auth_config: rustqueue::config::AuthConfig::default(),
auth_rate_limiter: rustqueue::api::auth::AuthRateLimiter::new(),
webhook_manager: None,
});
let app = api::router(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{addr}")
}
#[tokio::test]
async fn test_full_job_lifecycle() {
let base = start_test_server().await;
let client = Client::new();
let push_resp = client
.post(format!("{base}/api/v1/queues/lifecycle/jobs"))
.json(&json!({
"name": "process-order",
"data": {"order_id": 1234, "amount": 99.99}
}))
.send()
.await
.unwrap();
assert_eq!(push_resp.status(), 201, "push should return 201 Created");
let push_body: Value = push_resp.json().await.unwrap();
assert_eq!(push_body["ok"], true);
let job_id = push_body["id"]
.as_str()
.expect("response should contain a job id");
uuid::Uuid::parse_str(job_id).expect("id should be a valid UUID");
let pull_resp = client
.get(format!("{base}/api/v1/queues/lifecycle/jobs"))
.send()
.await
.unwrap();
assert_eq!(pull_resp.status(), 200);
let pull_body: Value = pull_resp.json().await.unwrap();
assert_eq!(pull_body["ok"], true);
assert!(
pull_body["job"].is_object(),
"pull should return a job object"
);
assert_eq!(
pull_body["job"]["id"], job_id,
"pulled job should match pushed job id"
);
assert_eq!(
pull_body["job"]["state"], "active",
"pulled job should be in active state"
);
let ack_resp = client
.post(format!("{base}/api/v1/jobs/{job_id}/ack"))
.json(&json!({"result": null}))
.send()
.await
.unwrap();
assert_eq!(ack_resp.status(), 200, "ack should return 200 OK");
let ack_body: Value = ack_resp.json().await.unwrap();
assert_eq!(ack_body["ok"], true);
let stats_resp = client
.get(format!("{base}/api/v1/queues/lifecycle/stats"))
.send()
.await
.unwrap();
assert_eq!(stats_resp.status(), 200);
let stats_body: Value = stats_resp.json().await.unwrap();
assert_eq!(stats_body["ok"], true);
assert_eq!(
stats_body["counts"]["waiting"], 0,
"no jobs should be waiting"
);
assert_eq!(
stats_body["counts"]["completed"], 1,
"one job should be completed"
);
let health_resp = client
.get(format!("{base}/api/v1/health"))
.send()
.await
.unwrap();
assert_eq!(
health_resp.status(),
200,
"health endpoint should return 200"
);
let health_body: Value = health_resp.json().await.unwrap();
assert_eq!(health_body["ok"], true);
assert_eq!(health_body["status"], "healthy");
let dlq_push_resp = client
.post(format!("{base}/api/v1/queues/lifecycle/jobs"))
.json(&json!({
"name": "fragile-task",
"data": {"fragile": true},
"max_attempts": 1
}))
.send()
.await
.unwrap();
assert_eq!(dlq_push_resp.status(), 201);
let dlq_push_body: Value = dlq_push_resp.json().await.unwrap();
let dlq_job_id = dlq_push_body["id"].as_str().unwrap();
let dlq_pull_resp = client
.get(format!("{base}/api/v1/queues/lifecycle/jobs"))
.send()
.await
.unwrap();
assert_eq!(dlq_pull_resp.status(), 200);
let dlq_pull_body: Value = dlq_pull_resp.json().await.unwrap();
assert_eq!(dlq_pull_body["job"]["id"], dlq_job_id);
let fail_resp = client
.post(format!("{base}/api/v1/jobs/{dlq_job_id}/fail"))
.json(&json!({"error": "fatal crash"}))
.send()
.await
.unwrap();
assert_eq!(fail_resp.status(), 200);
let fail_body: Value = fail_resp.json().await.unwrap();
assert_eq!(fail_body["ok"], true);
assert_eq!(
fail_body["retry"], false,
"with max_attempts=1 the job should not retry"
);
let dlq_get_resp = client
.get(format!("{base}/api/v1/jobs/{dlq_job_id}"))
.send()
.await
.unwrap();
let dlq_get_body: Value = dlq_get_resp.json().await.unwrap();
assert_eq!(
dlq_get_body["job"]["state"], "dlq",
"job should be in DLQ after exhausting retries"
);
let delayed_push_resp = client
.post(format!("{base}/api/v1/queues/delayed-q/jobs"))
.json(&json!({
"name": "future-task",
"data": {"scheduled": true},
"delay_ms": 60000
}))
.send()
.await
.unwrap();
assert_eq!(delayed_push_resp.status(), 201);
let delayed_pull_resp = client
.get(format!("{base}/api/v1/queues/delayed-q/jobs"))
.send()
.await
.unwrap();
assert_eq!(delayed_pull_resp.status(), 200);
let delayed_pull_body: Value = delayed_pull_resp.json().await.unwrap();
assert_eq!(delayed_pull_body["ok"], true);
assert!(
delayed_pull_body["job"].is_null(),
"delayed job should not be pullable immediately"
);
let unique_push1 = client
.post(format!("{base}/api/v1/queues/lifecycle/jobs"))
.json(&json!({
"name": "deduplicated-task",
"data": {"attempt": 1},
"unique_key": "order-42"
}))
.send()
.await
.unwrap();
assert_eq!(
unique_push1.status(),
201,
"first push with unique key should succeed"
);
let unique_push2 = client
.post(format!("{base}/api/v1/queues/lifecycle/jobs"))
.json(&json!({
"name": "deduplicated-task",
"data": {"attempt": 2},
"unique_key": "order-42"
}))
.send()
.await
.unwrap();
assert_eq!(
unique_push2.status(),
409,
"second push with same unique key should return 409 Conflict"
);
let unique_push2_body: Value = unique_push2.json().await.unwrap();
assert_eq!(unique_push2_body["ok"], false);
assert_eq!(unique_push2_body["error"]["code"], "DUPLICATE_KEY");
}