#![cfg(feature = "api")]
mod common;
use serde_json::{json, Value};
use sqlx::query_as::query_as;
#[tokio::test]
async fn two_concurrent_posts_with_same_key_return_same_job() {
let pool = common::fresh_pool().await;
let server = common::spawn_api(pool.clone()).await;
let client = reqwest::Client::new();
let key = "user-1-welcome";
let base_url = server.base_url.clone();
let post = || {
let c = client.clone();
let url = format!("{base_url}/jobs");
async move {
c.post(url)
.header("Idempotency-Key", key)
.json(&json!({
"kind": "send_email",
"payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
}))
.send()
.await
.expect("post")
}
};
let (r1, r2) = tokio::join!(post(), post());
let s1 = r1.status().as_u16();
let s2 = r2.status().as_u16();
let b1: Value = r1.json().await.unwrap();
let b2: Value = r2.json().await.unwrap();
assert!(s1 == 200 || s1 == 201, "first status was {s1}");
assert!(s2 == 200 || s2 == 201, "second status was {s2}");
assert_eq!(
b1["id"], b2["id"],
"same idempotency key must return the same job id"
);
let count: (i64,) = query_as("SELECT COUNT(*) FROM jobs WHERE idempotency_key = $1")
.bind(key)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
count.0, 1,
"exactly one row regardless of how many duplicate POSTs"
);
server.shutdown().await;
}
#[tokio::test]
async fn different_keys_produce_different_jobs() {
let pool = common::fresh_pool().await;
let server = common::spawn_api(pool.clone()).await;
let client = reqwest::Client::new();
let post = |key: &str| {
let c = client.clone();
let key = key.to_string();
let base = server.base_url.clone();
async move {
c.post(format!("{base}/jobs"))
.header("Idempotency-Key", &key)
.json(&json!({
"kind": "send_email",
"payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
}))
.send()
.await
.unwrap()
}
};
let r1: Value = post("k1").await.json().await.unwrap();
let r2: Value = post("k2").await.json().await.unwrap();
assert_ne!(r1["id"], r2["id"]);
server.shutdown().await;
}
#[tokio::test]
async fn n_concurrent_posts_with_same_key_collapse_to_one_row() {
const N: usize = 10;
let pool = common::fresh_pool().await;
let server = common::spawn_api(pool.clone()).await;
let client = reqwest::Client::new();
let key = "stress-race-key";
let base_url = server.base_url.clone();
let mut tasks = Vec::with_capacity(N);
for _ in 0..N {
let c = client.clone();
let url = format!("{base_url}/jobs");
let key_owned = key.to_string();
tasks.push(tokio::spawn(async move {
c.post(url)
.header("Idempotency-Key", &key_owned)
.json(&json!({
"kind": "send_email",
"payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
}))
.send()
.await
.expect("post")
}));
}
let mut ids = Vec::with_capacity(N);
let mut status_codes = Vec::with_capacity(N);
for t in tasks {
let res = t.await.expect("task panicked");
status_codes.push(res.status().as_u16());
let body: Value = res.json().await.expect("json");
ids.push(body["id"].as_str().unwrap().to_string());
}
let first = ids[0].clone();
assert!(
ids.iter().all(|id| id == &first),
"ids diverged across {N} concurrent posts: {ids:?}"
);
assert_eq!(status_codes.len(), N);
for code in &status_codes {
assert!(*code == 200 || *code == 201, "got unexpected status {code}");
}
let count: (i64,) = query_as("SELECT COUNT(*) FROM jobs WHERE idempotency_key = $1")
.bind(key)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 1, "expected exactly one row for the shared key");
server.shutdown().await;
}
#[tokio::test]
async fn header_takes_precedence_over_body_field() {
let pool = common::fresh_pool().await;
let server = common::spawn_api(pool.clone()).await;
let client = reqwest::Client::new();
let res = client
.post(format!("{}/jobs", server.base_url))
.header("Idempotency-Key", "alpha")
.json(&json!({
"kind": "send_email",
"payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
"idempotency_key": "beta", }))
.send()
.await
.unwrap();
assert_eq!(res.status(), 201);
let body: Value = res.json().await.unwrap();
let id_1 = body["id"].as_str().unwrap().to_string();
assert_eq!(body["idempotency_key"], "alpha");
let res = client
.post(format!("{}/jobs", server.base_url))
.header("Idempotency-Key", "alpha")
.json(&json!({
"kind": "send_email",
"payload": {"to": "a@b.c", "subject": "hi", "body": "hello"},
"idempotency_key": "gamma",
}))
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
let body: Value = res.json().await.unwrap();
assert_eq!(body["id"].as_str().unwrap(), id_1);
server.shutdown().await;
}