use std::sync::Arc;
use reqwest::Client;
use serde_json::{Value, json};
use rustqueue::api::{self, AppState};
use rustqueue::engine::queue::QueueManager;
use rustqueue::storage::MemoryStorage;
async fn start_test_server() -> (String, Arc<QueueManager>) {
let (event_tx, _) = tokio::sync::broadcast::channel(1024);
let storage = Arc::new(MemoryStorage::new());
let qm = Arc::new(
QueueManager::new(storage)
.with_event_sender(event_tx.clone())
.with_max_dag_depth(5),
);
let state = Arc::new(AppState {
queue_manager: Arc::clone(&qm),
start_time: std::time::Instant::now(),
metrics_handle: None,
event_tx,
auth_config: rustqueue::config::AuthConfig::default(),
auth_rate_limiter: rustqueue::api::auth::AuthRateLimiter::new(),
webhook_manager: None,
});
let app = api::router(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(format!("http://{addr}"), qm)
}
async fn push_job(client: &Client, base: &str, queue: &str, body: Value) -> String {
let resp = client
.post(format!("{base}/api/v1/queues/{queue}/jobs"))
.json(&body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 201, "push should return 201");
let val: Value = resp.json().await.unwrap();
val["id"].as_str().unwrap().to_string()
}
async fn get_job(client: &Client, base: &str, id: &str) -> Value {
let resp = client
.get(format!("{base}/api/v1/jobs/{id}"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
resp.json().await.unwrap()
}
async fn ack_job(client: &Client, base: &str, id: &str) {
let resp = client
.post(format!("{base}/api/v1/jobs/{id}/ack"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200, "ack should return 200");
}
async fn pull_job(client: &Client, base: &str, queue: &str) -> Option<Value> {
let resp = client
.get(format!("{base}/api/v1/queues/{queue}/jobs"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Value = resp.json().await.unwrap();
if body["job"].is_object() {
Some(body["job"].clone())
} else {
None
}
}
async fn fail_job(client: &Client, base: &str, id: &str) {
let resp = client
.post(format!("{base}/api/v1/jobs/{id}/fail"))
.json(&json!({"error": "test failure"}))
.send()
.await
.unwrap();
assert!(
resp.status() == 200 || resp.status() == 409,
"fail returned unexpected status: {}",
resp.status()
);
}
#[tokio::test]
async fn test_child_blocked_until_parent_ack() {
let (base, _qm) = start_test_server().await;
let client = Client::new();
let parent_id = push_job(
&client,
&base,
"dag",
json!({
"name": "parent-job",
"data": {"step": "parent"}
}),
)
.await;
let child_id = push_job(
&client,
&base,
"dag",
json!({
"name": "child-job",
"data": {"step": "child"},
"depends_on": [parent_id]
}),
)
.await;
let child = get_job(&client, &base, &child_id).await;
assert_eq!(
child["job"]["state"], "blocked",
"child should start as Blocked"
);
let pulled = pull_job(&client, &base, "dag")
.await
.expect("should pull parent");
assert_eq!(pulled["id"], parent_id);
let empty = pull_job(&client, &base, "dag").await;
assert!(
empty.is_none(),
"child should not be pullable while Blocked"
);
ack_job(&client, &base, &parent_id).await;
let child_after = get_job(&client, &base, &child_id).await;
assert_eq!(
child_after["job"]["state"], "waiting",
"child should be Waiting after parent ack"
);
let pulled_child = pull_job(&client, &base, "dag")
.await
.expect("should pull child now");
assert_eq!(pulled_child["id"], child_id);
}
#[tokio::test]
async fn test_chain_a_b_c() {
let (base, _qm) = start_test_server().await;
let client = Client::new();
let a_id = push_job(
&client,
&base,
"chain",
json!({
"name": "step-a",
"data": {},
"flow_id": "pipeline-1"
}),
)
.await;
let b_id = push_job(
&client,
&base,
"chain",
json!({
"name": "step-b",
"data": {},
"depends_on": [a_id],
"flow_id": "pipeline-1"
}),
)
.await;
let c_id = push_job(
&client,
&base,
"chain",
json!({
"name": "step-c",
"data": {},
"depends_on": [b_id],
"flow_id": "pipeline-1"
}),
)
.await;
assert_eq!(
get_job(&client, &base, &b_id).await["job"]["state"],
"blocked"
);
assert_eq!(
get_job(&client, &base, &c_id).await["job"]["state"],
"blocked"
);
let pulled_a = pull_job(&client, &base, "chain")
.await
.expect("should pull A");
assert_eq!(pulled_a["id"], a_id);
ack_job(&client, &base, &a_id).await;
assert_eq!(
get_job(&client, &base, &b_id).await["job"]["state"],
"waiting"
);
assert_eq!(
get_job(&client, &base, &c_id).await["job"]["state"],
"blocked"
);
let pulled_b = pull_job(&client, &base, "chain")
.await
.expect("should pull B");
assert_eq!(pulled_b["id"], b_id);
ack_job(&client, &base, &b_id).await;
assert_eq!(
get_job(&client, &base, &c_id).await["job"]["state"],
"waiting"
);
let pulled_c = pull_job(&client, &base, "chain")
.await
.expect("should pull C");
assert_eq!(pulled_c["id"], c_id);
ack_job(&client, &base, &c_id).await;
assert_eq!(
get_job(&client, &base, &c_id).await["job"]["state"],
"completed"
);
}
#[tokio::test]
async fn test_cycle_detection() {
let (base, _qm) = start_test_server().await;
let client = Client::new();
let a_id = push_job(
&client,
&base,
"cycle",
json!({
"name": "node-a",
"data": {}
}),
)
.await;
let b_id = push_job(
&client,
&base,
"cycle",
json!({
"name": "node-b",
"data": {},
"depends_on": [a_id]
}),
)
.await;
let resp = client
.post(format!("{base}/api/v1/queues/cycle/jobs"))
.json(&json!({
"name": "self-dep",
"data": {},
"depends_on": ["00000000-0000-0000-0000-000000000000"]
}))
.send()
.await
.unwrap();
assert_ne!(resp.status(), 201, "should reject non-existent dep");
assert_eq!(
get_job(&client, &base, &a_id).await["job"]["state"],
"waiting"
);
assert_eq!(
get_job(&client, &base, &b_id).await["job"]["state"],
"blocked"
);
}
#[tokio::test]
async fn test_max_depth_exceeded() {
let (base, _qm) = start_test_server().await;
let client = Client::new();
let mut prev_id = push_job(
&client,
&base,
"deep",
json!({
"name": "depth-0",
"data": {}
}),
)
.await;
for i in 1..=5 {
prev_id = push_job(
&client,
&base,
"deep",
json!({
"name": format!("depth-{}", i),
"data": {},
"depends_on": [prev_id]
}),
)
.await;
}
let resp = client
.post(format!("{base}/api/v1/queues/deep/jobs"))
.json(&json!({
"name": "depth-6-too-deep",
"data": {},
"depends_on": [prev_id]
}))
.send()
.await
.unwrap();
assert_ne!(
resp.status(),
201,
"should reject job exceeding max DAG depth"
);
}
#[tokio::test]
async fn test_parent_dlq_cascades_to_child() {
let (base, _qm) = start_test_server().await;
let client = Client::new();
let parent_id = push_job(
&client,
&base,
"cascade",
json!({
"name": "parent",
"data": {},
"max_attempts": 1
}),
)
.await;
let child_id = push_job(
&client,
&base,
"cascade",
json!({
"name": "child",
"data": {},
"depends_on": [parent_id]
}),
)
.await;
assert_eq!(
get_job(&client, &base, &child_id).await["job"]["state"],
"blocked"
);
let pulled = pull_job(&client, &base, "cascade")
.await
.expect("should pull parent");
assert_eq!(pulled["id"], parent_id);
fail_job(&client, &base, &parent_id).await;
let parent_state = get_job(&client, &base, &parent_id).await;
let state = parent_state["job"]["state"].as_str().unwrap();
if state == "dlq" {
let child_after = get_job(&client, &base, &child_id).await;
assert_eq!(
child_after["job"]["state"], "dlq",
"child should be cascaded to DLQ when parent goes to DLQ"
);
}
}
#[tokio::test]
async fn test_dep_already_completed() {
let (base, _qm) = start_test_server().await;
let client = Client::new();
let parent_id = push_job(
&client,
&base,
"pre-done",
json!({
"name": "parent",
"data": {}
}),
)
.await;
let _pulled = pull_job(&client, &base, "pre-done")
.await
.expect("should pull parent");
ack_job(&client, &base, &parent_id).await;
let child_id = push_job(
&client,
&base,
"pre-done",
json!({
"name": "child",
"data": {},
"depends_on": [parent_id]
}),
)
.await;
let child = get_job(&client, &base, &child_id).await;
assert_eq!(
child["job"]["state"], "waiting",
"child should be Waiting when all deps already completed"
);
}
#[tokio::test]
async fn test_flow_status_endpoint() {
let (base, _qm) = start_test_server().await;
let client = Client::new();
let flow_id = "test-flow-42";
let a_id = push_job(
&client,
&base,
"flow-q",
json!({
"name": "flow-a",
"data": {},
"flow_id": flow_id
}),
)
.await;
let _b_id = push_job(
&client,
&base,
"flow-q",
json!({
"name": "flow-b",
"data": {},
"depends_on": [a_id],
"flow_id": flow_id
}),
)
.await;
let _c_id = push_job(
&client,
&base,
"flow-q",
json!({
"name": "flow-c",
"data": {},
"depends_on": [a_id],
"flow_id": flow_id
}),
)
.await;
let resp = client
.get(format!("{base}/api/v1/flows/{flow_id}"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["ok"], true);
assert_eq!(body["flow_id"], flow_id);
let jobs = body["jobs"].as_array().expect("should have jobs array");
assert_eq!(jobs.len(), 3, "flow should contain 3 jobs");
let summary = &body["summary"];
assert_eq!(summary["total"], 3);
assert_eq!(summary["waiting"], 1);
assert_eq!(summary["blocked"], 2);
}
#[tokio::test]
async fn test_nonexistent_dep_rejected() {
let (base, _qm) = start_test_server().await;
let client = Client::new();
let fake_id = uuid::Uuid::now_v7().to_string();
let resp = client
.post(format!("{base}/api/v1/queues/reject/jobs"))
.json(&json!({
"name": "orphan-child",
"data": {},
"depends_on": [fake_id]
}))
.send()
.await
.unwrap();
assert_ne!(
resp.status(),
201,
"should reject job with non-existent dependency"
);
}