rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Integration test: concurrent POSTs with the same `Idempotency-Key`
//! collapse to one row and return the same job id.

#![cfg(feature = "api")]

mod common;

use serde_json::{json, Value};
use sqlx::query_as::query_as;

#[tokio::test]
async fn two_concurrent_posts_with_same_key_return_same_job() {
    let pool = common::fresh_pool().await;
    let server = common::spawn_api(pool.clone()).await;
    let client = reqwest::Client::new();
    let key = "user-1-welcome";
    let base_url = server.base_url.clone();

    let post = || {
        let c = client.clone();
        let url = format!("{base_url}/jobs");
        async move {
            c.post(url)
                .header("Idempotency-Key", key)
                .json(&json!({
                    "kind": "send_email",
                    "payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
                }))
                .send()
                .await
                .expect("post")
        }
    };

    let (r1, r2) = tokio::join!(post(), post());
    let s1 = r1.status().as_u16();
    let s2 = r2.status().as_u16();
    let b1: Value = r1.json().await.unwrap();
    let b2: Value = r2.json().await.unwrap();

    assert!(s1 == 200 || s1 == 201, "first status was {s1}");
    assert!(s2 == 200 || s2 == 201, "second status was {s2}");
    assert_eq!(
        b1["id"], b2["id"],
        "same idempotency key must return the same job id"
    );

    let count: (i64,) = query_as("SELECT COUNT(*) FROM jobs WHERE idempotency_key = $1")
        .bind(key)
        .fetch_one(&pool)
        .await
        .unwrap();
    assert_eq!(
        count.0, 1,
        "exactly one row regardless of how many duplicate POSTs"
    );

    server.shutdown().await;
}

#[tokio::test]
async fn different_keys_produce_different_jobs() {
    let pool = common::fresh_pool().await;
    let server = common::spawn_api(pool.clone()).await;
    let client = reqwest::Client::new();

    let post = |key: &str| {
        let c = client.clone();
        let key = key.to_string();
        let base = server.base_url.clone();
        async move {
            c.post(format!("{base}/jobs"))
                .header("Idempotency-Key", &key)
                .json(&json!({
                    "kind": "send_email",
                    "payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
                }))
                .send()
                .await
                .unwrap()
        }
    };

    let r1: Value = post("k1").await.json().await.unwrap();
    let r2: Value = post("k2").await.json().await.unwrap();
    assert_ne!(r1["id"], r2["id"]);

    server.shutdown().await;
}

#[tokio::test]
async fn n_concurrent_posts_with_same_key_collapse_to_one_row() {
    // A higher-N stress check on the idempotency-race semantics. The
    // 2-concurrent test above checks the happy-path collapse; this one
    // exercises the actual contention on the partial unique index across
    // many simultaneous inserters.
    const N: usize = 10;

    let pool = common::fresh_pool().await;
    let server = common::spawn_api(pool.clone()).await;
    let client = reqwest::Client::new();
    let key = "stress-race-key";
    let base_url = server.base_url.clone();

    let mut tasks = Vec::with_capacity(N);
    for _ in 0..N {
        let c = client.clone();
        let url = format!("{base_url}/jobs");
        let key_owned = key.to_string();
        tasks.push(tokio::spawn(async move {
            c.post(url)
                .header("Idempotency-Key", &key_owned)
                .json(&json!({
                    "kind": "send_email",
                    "payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
                }))
                .send()
                .await
                .expect("post")
        }));
    }

    let mut ids = Vec::with_capacity(N);
    let mut status_codes = Vec::with_capacity(N);
    for t in tasks {
        let res = t.await.expect("task panicked");
        status_codes.push(res.status().as_u16());
        let body: Value = res.json().await.expect("json");
        ids.push(body["id"].as_str().unwrap().to_string());
    }

    // Every response must report the same job id.
    let first = ids[0].clone();
    assert!(
        ids.iter().all(|id| id == &first),
        "ids diverged across {N} concurrent posts: {ids:?}"
    );

    // Every response must be 200 or 201, no 4xx/5xx errors.
    assert_eq!(status_codes.len(), N);
    for code in &status_codes {
        assert!(*code == 200 || *code == 201, "got unexpected status {code}");
    }

    // The DB has exactly one row for this key.
    let count: (i64,) = query_as("SELECT COUNT(*) FROM jobs WHERE idempotency_key = $1")
        .bind(key)
        .fetch_one(&pool)
        .await
        .unwrap();
    assert_eq!(count.0, 1, "expected exactly one row for the shared key");

    server.shutdown().await;
}

#[tokio::test]
async fn header_takes_precedence_over_body_field() {
    let pool = common::fresh_pool().await;
    let server = common::spawn_api(pool.clone()).await;
    let client = reqwest::Client::new();

    // First request: header "alpha".
    let res = client
        .post(format!("{}/jobs", server.base_url))
        .header("Idempotency-Key", "alpha")
        .json(&json!({
            "kind": "send_email",
            "payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
            "idempotency_key": "beta",   // body key, ignored because header wins
        }))
        .send()
        .await
        .unwrap();
    assert_eq!(res.status(), 201);
    let body: Value = res.json().await.unwrap();
    let id_1 = body["id"].as_str().unwrap().to_string();
    assert_eq!(body["idempotency_key"], "alpha");

    // Second request: header "alpha" again, body "gamma". Should return job 1.
    let res = client
        .post(format!("{}/jobs", server.base_url))
        .header("Idempotency-Key", "alpha")
        .json(&json!({
            "kind": "send_email",
            "payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
            "idempotency_key": "gamma",
        }))
        .send()
        .await
        .unwrap();
    assert_eq!(res.status(), 200);
    let body: Value = res.json().await.unwrap();
    assert_eq!(body["id"].as_str().unwrap(), id_1);

    server.shutdown().await;
}