use axum::{
body::Body,
http::{Request, StatusCode},
};
use qrusty::api::ApiServer;
use qrusty::storage::Storage;
use serde_json::{json, Value};
use std::sync::Arc;
use tempfile::TempDir;
use tower::ServiceExt;
async fn create_test_api() -> (ApiServer, TempDir) {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let storage =
Storage::new(temp_dir.path().to_str().unwrap()).expect("Failed to create test storage");
let api = ApiServer::new(Arc::new(storage));
(api, temp_dir)
}
async fn make_request(
api: ApiServer,
method: &str,
path: &str,
body: Option<Value>,
) -> (StatusCode, Value) {
let app = api.router();
let request_builder = Request::builder()
.method(method)
.uri(path)
.header("content-type", "application/json");
let request = if let Some(body_json) = body {
request_builder
.body(Body::from(body_json.to_string()))
.unwrap()
} else {
request_builder.body(Body::empty()).unwrap()
};
let response = app.oneshot(request).await.unwrap();
let status = response.status();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8(body.to_vec()).unwrap();
let json_body: Value = if body_str.is_empty() {
Value::Null
} else {
serde_json::from_str(&body_str).unwrap_or(Value::String(body_str))
};
(status, json_body)
}
#[tokio::test]
async fn test_health_endpoint() {
let (api, _temp_dir) = create_test_api().await;
let (status, body) = make_request(api, "GET", "/health", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["status"], Value::String("ok".to_string()));
}
#[tokio::test]
async fn test_stats_endpoint() {
let (api, _temp_dir) = create_test_api().await;
let (status, body) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object());
assert_eq!(body["queues"].as_array().unwrap().len(), 0);
assert_eq!(body["summary"]["total_queues"], 0);
assert_eq!(body["summary"]["total_messages"], 0);
let publish_request1 = json!({
"queue": "orders",
"priority": 100,
"payload": "{\"order_id\": 123}"
});
make_request(api.clone(), "POST", "/publish", Some(publish_request1)).await;
let publish_request2 = json!({
"queue": "orders",
"priority": 50,
"payload": "{\"order_id\": 124}"
});
make_request(api.clone(), "POST", "/publish", Some(publish_request2)).await;
let publish_request3 = json!({
"queue": "notifications",
"priority": 200,
"payload": "{\"user_id\": 456}"
});
make_request(api.clone(), "POST", "/publish", Some(publish_request3)).await;
let (status, body) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object());
let queues = body["queues"].as_array().unwrap();
assert_eq!(queues.len(), 2);
assert_eq!(body["summary"]["total_queues"], 2);
assert_eq!(body["summary"]["total_messages"], 3);
assert_eq!(body["summary"]["total_available"], 3);
assert_eq!(body["summary"]["total_locked"], 0);
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
make_request(
api.clone(),
"POST",
"/consume/orders",
Some(consume_request),
)
.await;
let (status, body) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["summary"]["total_messages"], 3);
assert_eq!(body["summary"]["total_available"], 2);
assert_eq!(body["summary"]["total_locked"], 1);
}
#[tokio::test]
async fn test_publish_message() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "test_queue",
"priority": 100,
"payload": "{\"test\": \"data\"}",
"max_retries": 3
});
let (status, body) = make_request(api, "POST", "/publish", Some(publish_request)).await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object());
assert!(body["id"].is_string());
assert!(!body["id"].as_str().unwrap().is_empty());
}
#[tokio::test]
async fn test_publish_message_minimal() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "test_queue",
"priority": 50,
"payload": "{\"minimal\": true}"
});
let (status, body) = make_request(api, "POST", "/publish", Some(publish_request)).await;
assert_eq!(status, StatusCode::OK);
assert!(body["id"].is_string());
}
#[tokio::test]
async fn test_webui_served_from_configurable_directory() {
let (api, _temp_dir) = create_test_api().await;
let webui_dir = TempDir::new().expect("Failed to create WEBUI_DIR temp dir");
let assets_dir = webui_dir.path().join("assets");
std::fs::create_dir_all(&assets_dir).expect("Failed to create assets dir");
std::fs::write(webui_dir.path().join("index.html"), "<html>ok</html>")
.expect("Failed to write index.html");
std::fs::write(assets_dir.join("test.txt"), "asset-ok").expect("Failed to write test asset");
std::env::set_var("WEBUI_DIR", webui_dir.path());
let (status, body) = make_request(api.clone(), "GET", "/ui", None).await;
assert_eq!(status, StatusCode::OK);
assert!(body.as_str().unwrap_or("").contains("<html>"));
let (status, body) = make_request(api, "GET", "/assets/test.txt", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::String("asset-ok".to_string()));
}
#[tokio::test]
async fn test_publish_and_consume() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "test_queue",
"priority": 100,
"payload": "{\"order_id\": 12345}",
"max_retries": 3
});
let (status, publish_body) =
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
assert_eq!(status, StatusCode::OK);
let message_id = publish_body["id"].as_str().unwrap();
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
let (status, consume_body) =
make_request(api, "POST", "/consume/test_queue", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert!(consume_body.is_object());
assert_eq!(consume_body["id"].as_str().unwrap(), message_id);
assert_eq!(
consume_body["payload"].as_str().unwrap(),
"{\"order_id\": 12345}"
);
assert_eq!(consume_body["retry_count"].as_u64().unwrap(), 1);
}
#[tokio::test]
async fn test_consume_empty_queue() {
let (api, _temp_dir) = create_test_api().await;
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
let (status, body) =
make_request(api, "POST", "/consume/empty_queue", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::Null);
}
#[tokio::test]
async fn test_consume_with_default_timeout() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "test_queue",
"priority": 100,
"payload": "{\"test\": true}"
});
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
let consume_request = json!({
"consumer_id": "worker-1"
});
let (status, body) =
make_request(api, "POST", "/consume/test_queue", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object());
assert!(!body["id"].as_str().unwrap().is_empty());
}
#[tokio::test]
async fn test_ack_message() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "test_queue",
"priority": 100,
"payload": "{\"test\": true}"
});
let (_, publish_body) =
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
let message_id = publish_body["id"].as_str().unwrap();
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
make_request(
api.clone(),
"POST",
"/consume/test_queue",
Some(consume_request),
)
.await;
let ack_request = json!({
"consumer_id": "worker-1"
});
let ack_path = format!("/ack/test_queue/{}", message_id);
let (status, _) = make_request(api.clone(), "POST", &ack_path, Some(ack_request)).await;
assert_eq!(status, StatusCode::OK);
let consume_request2 = json!({
"consumer_id": "worker-2",
"timeout_seconds": 30
});
let (status, body) =
make_request(api, "POST", "/consume/test_queue", Some(consume_request2)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::Null);
}
#[tokio::test]
async fn test_ack_wrong_consumer() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "test_queue",
"priority": 100,
"payload": "{\"test\": true}"
});
let (_, publish_body) =
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
let message_id = publish_body["id"].as_str().unwrap();
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
make_request(
api.clone(),
"POST",
"/consume/test_queue",
Some(consume_request),
)
.await;
let ack_request = json!({
"consumer_id": "worker-2"
});
let ack_path = format!("/ack/test_queue/{}", message_id);
let (status, _) = make_request(api, "POST", &ack_path, Some(ack_request)).await;
assert_eq!(status, StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_nack_message() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "test_queue",
"priority": 100,
"payload": "{\"test\": true}"
});
let (_, publish_body) =
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
let message_id = publish_body["id"].as_str().unwrap();
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
make_request(
api.clone(),
"POST",
"/consume/test_queue",
Some(consume_request),
)
.await;
let nack_request = json!({
"consumer_id": "worker-1"
});
let nack_path = format!("/nack/test_queue/{}", message_id);
let (status, _) = make_request(api.clone(), "POST", &nack_path, Some(nack_request)).await;
assert_eq!(status, StatusCode::OK);
let consume_request2 = json!({
"consumer_id": "worker-2",
"timeout_seconds": 30
});
let (status, body) =
make_request(api, "POST", "/consume/test_queue", Some(consume_request2)).await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object());
assert_eq!(body["id"].as_str().unwrap(), message_id);
assert_eq!(body["retry_count"].as_u64().unwrap(), 2); }
#[tokio::test]
async fn test_nack_wrong_consumer() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "test_queue",
"priority": 100,
"payload": "{\"test\": true}"
});
let (_, publish_body) =
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
let message_id = publish_body["id"].as_str().unwrap();
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
make_request(
api.clone(),
"POST",
"/consume/test_queue",
Some(consume_request),
)
.await;
let nack_request = json!({
"consumer_id": "worker-2"
});
let nack_path = format!("/nack/test_queue/{}", message_id);
let (status, _) = make_request(api, "POST", &nack_path, Some(nack_request)).await;
assert_eq!(status, StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_priority_ordering_through_api() {
let (api, _temp_dir) = create_test_api().await;
let messages = vec![
(10, "low priority"),
(100, "high priority"),
(50, "medium priority"),
];
for (priority, payload) in &messages {
let publish_request = json!({
"queue": "priority_test",
"priority": priority,
"payload": format!("{{\"message\": \"{}\"}}", payload)
});
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
}
let expected_order = vec!["high priority", "medium priority", "low priority"];
for expected_message in expected_order {
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/priority_test",
Some(consume_request),
)
.await;
assert_eq!(status, StatusCode::OK);
let payload = body["payload"].as_str().unwrap();
assert!(payload.contains(expected_message));
}
}
#[tokio::test]
async fn test_queue_isolation_through_api() {
let (api, _temp_dir) = create_test_api().await;
let queue1_request = json!({
"queue": "queue1",
"priority": 100,
"payload": "{\"queue\": \"queue1\"}"
});
let queue2_request = json!({
"queue": "queue2",
"priority": 100,
"payload": "{\"queue\": \"queue2\"}"
});
make_request(api.clone(), "POST", "/publish", Some(queue1_request)).await;
make_request(api.clone(), "POST", "/publish", Some(queue2_request)).await;
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/queue1",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert!(body["payload"].as_str().unwrap().contains("queue1"));
let consume_request2 = json!({
"consumer_id": "worker-2",
"timeout_seconds": 30
});
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/queue2",
Some(consume_request2.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert!(body["payload"].as_str().unwrap().contains("queue2"));
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/queue1",
Some(consume_request),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::Null);
let (status, body) = make_request(api, "POST", "/consume/queue2", Some(consume_request2)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::Null);
}
#[tokio::test]
async fn test_invalid_json_handling() {
let (_api, _temp_dir) = create_test_api().await;
}
#[tokio::test]
async fn test_concurrent_api_operations() {
let (api, _temp_dir) = create_test_api().await;
let api = Arc::new(api);
let mut publish_handles = vec![];
for i in 0..10 {
let api_clone = api.clone();
let handle = tokio::spawn(async move {
let publish_request = json!({
"queue": "concurrent_test",
"priority": i * 10,
"payload": format!("{{\"message_number\": {}}}", i)
});
make_request(
(*api_clone).clone(),
"POST",
"/publish",
Some(publish_request),
)
.await
});
publish_handles.push(handle);
}
for handle in publish_handles {
let (status, _) = handle.await.unwrap();
assert_eq!(status, StatusCode::OK);
}
let mut consume_handles = vec![];
for i in 0..10 {
let api_clone = api.clone();
let handle = tokio::spawn(async move {
let consume_request = json!({
"consumer_id": format!("worker-{}", i),
"timeout_seconds": 30
});
make_request(
(*api_clone).clone(),
"POST",
"/consume/concurrent_test",
Some(consume_request),
)
.await
});
consume_handles.push(handle);
}
let mut consumed_messages = vec![];
for handle in consume_handles {
let (status, body) = handle.await.unwrap();
if status == StatusCode::OK && body != Value::Null {
consumed_messages.push(body);
}
}
assert_eq!(consumed_messages.len(), 10);
}
#[tokio::test]
async fn test_create_queue_max_first() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "urgent_queue",
"config": {
"ordering": "MaxFirst"
}
});
let (status, _body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
assert_eq!(status, StatusCode::OK);
let publish_low = json!({
"queue": "urgent_queue",
"priority": 10,
"payload": "low priority"
});
let publish_high = json!({
"queue": "urgent_queue",
"priority": 100,
"payload": "high priority"
});
make_request(api.clone(), "POST", "/publish", Some(publish_low)).await;
make_request(api.clone(), "POST", "/publish", Some(publish_high)).await;
let consume_request = json!({
"consumer_id": "test_consumer",
"timeout_seconds": 30
});
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/urgent_queue",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"], "high priority");
let (status, body) =
make_request(api, "POST", "/consume/urgent_queue", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"], "low priority");
}
#[tokio::test]
async fn test_create_queue_min_first() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "batch_queue",
"config": {
"ordering": "MinFirst"
}
});
let (status, _body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
assert_eq!(status, StatusCode::OK);
let publish_low = json!({
"queue": "batch_queue",
"priority": 10,
"payload": "low priority"
});
let publish_high = json!({
"queue": "batch_queue",
"priority": 100,
"payload": "high priority"
});
make_request(api.clone(), "POST", "/publish", Some(publish_low)).await;
make_request(api.clone(), "POST", "/publish", Some(publish_high)).await;
let consume_request = json!({
"consumer_id": "test_consumer",
"timeout_seconds": 30
});
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/batch_queue",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"], "low priority");
let (status, body) =
make_request(api, "POST", "/consume/batch_queue", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"], "high priority");
}
#[tokio::test]
async fn test_create_queue_conflict() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "duplicate_test",
"config": {
"ordering": "MaxFirst"
}
});
let (status, _body) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(create_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
assert_eq!(status, StatusCode::CONFLICT);
let update_request = json!({
"name": "duplicate_test",
"config": {
"allow_duplicates": false
}
});
let (status, _body) =
make_request(api.clone(), "POST", "/update-queue", Some(update_request)).await;
assert_eq!(status, StatusCode::OK);
let (status, stats) = make_request(api, "GET", "/queue-stats/duplicate_test", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(stats["config"]["allow_duplicates"], false);
}
#[tokio::test]
async fn test_queue_stats_endpoint() {
let (api, _temp_dir) = create_test_api().await;
let publish_request1 = json!({
"queue": "stats_queue",
"priority": 100,
"payload": "a"
});
make_request(api.clone(), "POST", "/publish", Some(publish_request1)).await;
let publish_request2 = json!({
"queue": "stats_queue",
"priority": 50,
"payload": "b"
});
make_request(api.clone(), "POST", "/publish", Some(publish_request2)).await;
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
make_request(
api.clone(),
"POST",
"/consume/stats_queue",
Some(consume_request),
)
.await;
let (status, body) = make_request(api, "GET", "/queue-stats/stats_queue", None).await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object());
assert_eq!(body["available"], 1);
assert_eq!(body["locked"], 1);
assert_eq!(body["total"], 2);
}
#[tokio::test]
async fn test_queue_stats_includes_live_rate_fields() {
let (api, _temp_dir) = create_test_api().await;
let (status, body) = make_request(api.clone(), "GET", "/queue-stats/rate_queue", None).await;
assert_eq!(status, StatusCode::OK);
assert!(body.get("publish_rate_per_sec").is_some());
assert!(body.get("ack_rate_per_sec").is_some());
let publish_request = json!({
"queue": "rate_queue",
"priority": 1,
"payload": "x"
});
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
let mut publish_rate = 0.0;
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while std::time::Instant::now() < deadline {
let (_status, body) =
make_request(api.clone(), "GET", "/queue-stats/rate_queue", None).await;
publish_rate = body["publish_rate_per_sec"].as_f64().unwrap_or(0.0);
if publish_rate > 0.0 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(
publish_rate > 0.0,
"expected publish_rate_per_sec > 0 after activity, got {publish_rate}"
);
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
let (_status, consumed) = make_request(
api.clone(),
"POST",
"/consume/rate_queue",
Some(consume_request),
)
.await;
let msg_id = consumed["id"]
.as_str()
.expect("expected consumed message id");
let ack_request = json!({
"consumer_id": "worker-1"
});
make_request(
api.clone(),
"POST",
&format!("/ack/rate_queue/{msg_id}"),
Some(ack_request),
)
.await;
let mut ack_rate = 0.0;
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while std::time::Instant::now() < deadline {
let (_status, body) =
make_request(api.clone(), "GET", "/queue-stats/rate_queue", None).await;
ack_rate = body["ack_rate_per_sec"].as_f64().unwrap_or(0.0);
if ack_rate > 0.0 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(
ack_rate > 0.0,
"expected ack_rate_per_sec > 0 after activity, got {ack_rate}"
);
}
#[tokio::test]
async fn test_queue_metrics_endpoint_returns_time_series() {
let (api, _temp_dir) = create_test_api().await;
let publish_request = json!({
"queue": "metrics_queue",
"priority": 1,
"payload": "hello"
});
let (status, body) = make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
assert_eq!(status, StatusCode::OK, "publish failed: {body}");
let consume_request = json!({
"consumer_id": "worker-1",
"timeout_seconds": 30
});
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/metrics_queue",
Some(consume_request),
)
.await;
assert_eq!(status, StatusCode::OK, "consume failed: {body}");
let mut body = Value::Null;
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
while std::time::Instant::now() < deadline {
let (status, next_body) =
make_request(api.clone(), "GET", "/queues/metrics_queue/metrics", None).await;
assert_eq!(status, StatusCode::OK);
let points_len = next_body["points"]
.as_array()
.expect("expected points array")
.len();
body = next_body;
if points_len > 0 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert_eq!(body["queue"], "metrics_queue");
assert_eq!(body["resolution_seconds"], 1);
assert_eq!(body["window_seconds"], 10);
let points = body["points"].as_array().expect("expected points array");
assert!(!points.is_empty(), "expected at least one sampled point");
let last = points.last().expect("non-empty points");
assert!(last["t_ms"].as_i64().is_some());
let total = last["total"].as_u64().unwrap_or(0);
let locked = last["locked"].as_u64().unwrap_or(0);
assert!(total >= 1);
assert!(locked <= total);
assert!(last["publish_rate_per_sec"].as_f64().is_some());
assert!(last["ack_rate_per_sec"].as_f64().is_some());
assert!(last["nack_rate_per_sec"].as_f64().is_some());
}
#[tokio::test]
async fn test_list_queues_endpoint() {
let (api, _temp_dir) = create_test_api().await;
let publish_orders = json!({
"queue": "orders",
"priority": 1,
"payload": "x"
});
make_request(api.clone(), "POST", "/publish", Some(publish_orders)).await;
let publish_notifications = json!({
"queue": "notifications",
"priority": 1,
"payload": "y"
});
make_request(api.clone(), "POST", "/publish", Some(publish_notifications)).await;
let (status, body) = make_request(api, "GET", "/queues", None).await;
assert_eq!(status, StatusCode::OK);
let queues = body.as_array().expect("Expected JSON array");
let queue_names: Vec<&str> = queues.iter().filter_map(|v| v.as_str()).collect();
assert!(queue_names.contains(&"orders"));
assert!(queue_names.contains(&"notifications"));
}
#[tokio::test]
async fn test_default_queue_behavior() {
let (api, _temp_dir) = create_test_api().await;
let publish_low = json!({
"queue": "default_queue",
"priority": 10,
"payload": "low priority"
});
let publish_high = json!({
"queue": "default_queue",
"priority": 100,
"payload": "high priority"
});
make_request(api.clone(), "POST", "/publish", Some(publish_low)).await;
make_request(api.clone(), "POST", "/publish", Some(publish_high)).await;
let consume_request = json!({
"consumer_id": "test_consumer",
"timeout_seconds": 30
});
let (status, body) =
make_request(api, "POST", "/consume/default_queue", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"], "high priority");
}
#[tokio::test]
async fn test_multiple_queue_configurations() {
let (api, _temp_dir) = create_test_api().await;
let create_urgent = json!({
"name": "urgent",
"config": {"ordering": "MaxFirst"}
});
let create_batch = json!({
"name": "batch",
"config": {"ordering": "MinFirst"}
});
make_request(api.clone(), "POST", "/create-queue", Some(create_urgent)).await;
make_request(api.clone(), "POST", "/create-queue", Some(create_batch)).await;
let urgent_msg = json!({
"queue": "urgent",
"priority": 50,
"payload": "urgent task"
});
let batch_msg = json!({
"queue": "batch",
"priority": 50,
"payload": "batch task"
});
make_request(api.clone(), "POST", "/publish", Some(urgent_msg)).await;
make_request(api.clone(), "POST", "/publish", Some(batch_msg)).await;
let consume_request = json!({
"consumer_id": "test_consumer",
"timeout_seconds": 30
});
let (status, urgent_body) = make_request(
api.clone(),
"POST",
"/consume/urgent",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(urgent_body["payload"], "urgent task");
let (status, batch_body) =
make_request(api, "POST", "/consume/batch", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(batch_body["payload"], "batch task");
}
#[tokio::test]
async fn test_priority_ordering_comprehensive() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "priority_test",
"config": {"ordering": "MinFirst"}
});
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
let priorities_and_payloads = vec![
(50, "medium"),
(10, "low"),
(100, "high"),
(1, "very_low"),
(90, "very_high"),
];
for (priority, payload) in &priorities_and_payloads {
let publish_request = json!({
"queue": "priority_test",
"priority": priority,
"payload": payload
});
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
}
let consume_request = json!({
"consumer_id": "test_consumer",
"timeout_seconds": 30
});
let expected_order = vec!["very_low", "low", "medium", "very_high", "high"];
let mut consumed_payloads = vec![];
for _ in 0..5 {
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/priority_test",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
consumed_payloads.push(body["payload"].as_str().unwrap().to_string());
}
assert_eq!(consumed_payloads, expected_order);
}
#[tokio::test]
async fn test_delete_queue_endpoint() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "delete_test",
"config": {
"ordering": "MaxFirst"
}
});
let (status, body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
if status != StatusCode::OK {
panic!(
"Create queue failed with status {} and body: {}",
status, body
);
}
for i in 1..=3 {
let publish_request = json!({
"queue": "delete_test",
"priority": i * 100,
"payload": format!("message {}", i)
});
let (status, _body) =
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
assert_eq!(status, StatusCode::OK);
}
let (stats_status, stats_body) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(stats_status, StatusCode::OK);
let queues = stats_body["queues"].as_array().unwrap();
let delete_test_queue = queues.iter().find(|q| q["name"] == "delete_test").unwrap();
assert_eq!(delete_test_queue["total"], 3);
let (delete_status, delete_body) =
make_request(api.clone(), "DELETE", "/delete-queue/delete_test", None).await;
if delete_status != StatusCode::OK {
panic!(
"Delete failed with status {} and body: {}",
delete_status, delete_body
);
}
assert_eq!(delete_body["queue"], "delete_test");
assert_eq!(delete_body["deleted_messages"], 3);
let (final_status, final_body) = make_request(api, "GET", "/stats", None).await;
assert_eq!(final_status, StatusCode::OK);
let queues = final_body["queues"].as_array().unwrap();
assert!(
!queues.iter().any(|q| q["name"] == "delete_test"),
"Queue should not exist after deletion"
);
}
#[tokio::test]
async fn test_purge_queue_endpoint() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "purge_test",
"config": {
"ordering": "MinFirst"
}
});
let (status, _body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
assert_eq!(status, StatusCode::OK);
for i in 1..=4 {
let publish_request = json!({
"queue": "purge_test",
"priority": i * 50,
"payload": format!("message {}", i)
});
let (status, _body) =
make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
assert_eq!(status, StatusCode::OK);
}
let (stats_status, stats_body) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(stats_status, StatusCode::OK);
let queues = stats_body["queues"].as_array().unwrap();
let purge_test_queue = queues.iter().find(|q| q["name"] == "purge_test").unwrap();
assert_eq!(purge_test_queue["total"], 4);
let (purge_status, purge_body) =
make_request(api.clone(), "POST", "/purge-queue/purge_test", None).await;
assert_eq!(purge_status, StatusCode::OK);
assert_eq!(purge_body["queue"], "purge_test");
assert_eq!(purge_body["purged_messages"], 4);
let (empty_status, empty_body) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(empty_status, StatusCode::OK);
let queues = empty_body["queues"].as_array().unwrap();
let purge_test_queue = queues.iter().find(|q| q["name"] == "purge_test").unwrap();
assert_eq!(purge_test_queue["total"], 0);
let new_message_request = json!({
"queue": "purge_test",
"priority": 25,
"payload": "new message after purge"
});
let (status, _body) =
make_request(api.clone(), "POST", "/publish", Some(new_message_request)).await;
assert_eq!(status, StatusCode::OK);
let consume_request = json!({
"consumer_id": "test_consumer",
"timeout_seconds": 5
});
let (consume_status, consume_body) =
make_request(api, "POST", "/consume/purge_test", Some(consume_request)).await;
assert_eq!(consume_status, StatusCode::OK);
assert_eq!(consume_body["payload"], "new message after purge");
}
#[tokio::test]
async fn test_rename_queue_via_update() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "rename_from",
"config": { "ordering": "Fifo", "allow_duplicates": true }
});
let (status, body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
assert_eq!(status, StatusCode::OK, "create failed: {}", body);
let publish_request = json!({
"queue": "rename_from",
"priority": 1,
"payload": "hello"
});
let (status, body) = make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
assert_eq!(status, StatusCode::OK, "publish failed: {}", body);
let update_request = json!({
"name": "rename_from",
"config": { "name": "rename_to" }
});
let (status, body) =
make_request(api.clone(), "POST", "/update-queue", Some(update_request)).await;
assert_eq!(status, StatusCode::OK, "update failed: {}", body);
let (stats_status, stats_body) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(stats_status, StatusCode::OK);
let queues = stats_body["queues"].as_array().unwrap();
assert!(queues.iter().any(|q| q["name"] == "rename_to"));
assert!(!queues.iter().any(|q| q["name"] == "rename_from"));
let consume_request = json!({
"consumer_id": "test_consumer",
"timeout_seconds": 5
});
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/rename_to",
Some(consume_request),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"], "hello");
}
#[tokio::test]
async fn test_update_queue_allow_duplicates() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "test_queue",
"config": { "ordering": "Fifo", "allow_duplicates": true }
});
let (status, body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
assert_eq!(status, StatusCode::OK, "create failed: {}", body);
let update_request = json!({
"name": "test_queue",
"config": { "allow_duplicates": false }
});
let (status, body) =
make_request(api.clone(), "POST", "/update-queue", Some(update_request)).await;
assert_eq!(status, StatusCode::OK, "update failed: {}", body);
let (status, stats) = make_request(api.clone(), "GET", "/queue-stats/test_queue", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(stats["config"]["allow_duplicates"], false);
assert_eq!(stats["config"]["ordering"], "Fifo"); }
#[tokio::test]
async fn test_update_queue_rename() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "old_name",
"config": { "ordering": "MaxFirst", "allow_duplicates": true }
});
let (status, body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
assert_eq!(status, StatusCode::OK, "create failed: {}", body);
let publish_request = json!({
"queue": "old_name",
"priority": 1,
"payload": "test"
});
let (status, body) = make_request(api.clone(), "POST", "/publish", Some(publish_request)).await;
assert_eq!(status, StatusCode::OK, "publish failed: {}", body);
let update_request = json!({
"name": "old_name",
"config": { "name": "new_name" }
});
let (status, body) =
make_request(api.clone(), "POST", "/update-queue", Some(update_request)).await;
assert_eq!(status, StatusCode::OK, "update failed: {}", body);
let (status, stats_body) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(status, StatusCode::OK);
let queues = stats_body["queues"].as_array().unwrap();
assert!(queues.iter().any(|q| q["name"] == "new_name"));
assert!(!queues.iter().any(|q| q["name"] == "old_name"));
let (status, stats) = make_request(api.clone(), "GET", "/queue-stats/new_name", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(stats["config"]["ordering"], "MaxFirst"); assert_eq!(stats["config"]["allow_duplicates"], true);
let consume_request = json!({
"consumer_id": "test",
"timeout_seconds": 1
});
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/new_name",
Some(consume_request),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"], "test");
}
#[tokio::test]
async fn test_update_queue_rename_and_allow_duplicates() {
let (api, _temp_dir) = create_test_api().await;
let create_request = json!({
"name": "queue_a",
"config": { "ordering": "MinFirst", "allow_duplicates": true }
});
let (status, body) =
make_request(api.clone(), "POST", "/create-queue", Some(create_request)).await;
assert_eq!(status, StatusCode::OK, "create failed: {}", body);
let update_request = json!({
"name": "queue_a",
"config": {
"name": "queue_b",
"allow_duplicates": false
}
});
let (status, body) =
make_request(api.clone(), "POST", "/update-queue", Some(update_request)).await;
assert_eq!(status, StatusCode::OK, "update failed: {}", body);
let (status, stats) = make_request(api.clone(), "GET", "/queue-stats/queue_b", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(stats["config"]["ordering"], "MinFirst"); assert_eq!(stats["config"]["allow_duplicates"], false); }
#[tokio::test]
async fn test_update_queue_nonexistent() {
let (api, _temp_dir) = create_test_api().await;
let update_request = json!({
"name": "nonexistent",
"config": { "allow_duplicates": false }
});
let (status, _body) =
make_request(api.clone(), "POST", "/update-queue", Some(update_request)).await;
assert_eq!(status, StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_delete_nonexistent_queue() {
let (api, _temp_dir) = create_test_api().await;
let (delete_status, delete_body) =
make_request(api, "DELETE", "/delete-queue/nonexistent", None).await;
assert_eq!(delete_status, StatusCode::OK);
assert_eq!(delete_body["queue"], "nonexistent");
assert_eq!(delete_body["deleted_messages"], 0);
}
#[tokio::test]
async fn test_purge_nonexistent_queue() {
let (api, _temp_dir) = create_test_api().await;
let (purge_status, purge_body) =
make_request(api, "POST", "/purge-queue/nonexistent", None).await;
assert_eq!(purge_status, StatusCode::OK);
assert_eq!(purge_body["queue"], "nonexistent");
assert_eq!(purge_body["purged_messages"], 0);
}
#[tokio::test]
async fn test_batch_ack_all_found() {
let (api, _temp_dir) = create_test_api().await;
let mut ids = Vec::new();
for i in 0..3 {
let (_, body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": "batch_q", "priority": 100, "payload": format!("p-{}", i) })),
)
.await;
ids.push(body["id"].as_str().unwrap().to_string());
}
for _ in 0..3 {
make_request(
api.clone(),
"POST",
"/consume/batch_q",
Some(json!({ "consumer_id": "worker-1", "timeout_seconds": 3600 })),
)
.await;
}
let (status, body) = make_request(
api.clone(),
"POST",
"/ack-batch/batch_q",
Some(json!({ "consumer_id": "worker-1", "message_ids": ids })),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["acked"].as_array().unwrap().len(), 3);
assert_eq!(body["not_found"].as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn test_batch_ack_partial_not_found() {
let (api, _temp_dir) = create_test_api().await;
let (_, body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": "batch_q", "priority": 100, "payload": "real" })),
)
.await;
let real_id = body["id"].as_str().unwrap().to_string();
make_request(
api.clone(),
"POST",
"/consume/batch_q",
Some(json!({ "consumer_id": "worker-1", "timeout_seconds": 3600 })),
)
.await;
let (status, body) = make_request(
api.clone(),
"POST",
"/ack-batch/batch_q",
Some(json!({ "consumer_id": "worker-1", "message_ids": [real_id, "fake-id"] })),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["acked"].as_array().unwrap().len(), 1);
assert_eq!(body["not_found"].as_array().unwrap().len(), 1);
assert_eq!(body["not_found"][0], "fake-id");
}
#[tokio::test]
async fn test_batch_ack_empty_list() {
let (api, _temp_dir) = create_test_api().await;
let (status, body) = make_request(
api.clone(),
"POST",
"/ack-batch/batch_q",
Some(json!({ "consumer_id": "worker-1", "message_ids": [] })),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["acked"].as_array().unwrap().len(), 0);
assert_eq!(body["not_found"].as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn test_batch_nack_all_unlocked() {
let (api, _temp_dir) = create_test_api().await;
let mut ids = Vec::new();
for i in 0..3 {
let (_, body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": "batch_q", "priority": 100, "payload": format!("p-{}", i) })),
)
.await;
ids.push(body["id"].as_str().unwrap().to_string());
}
for _ in 0..3 {
make_request(
api.clone(),
"POST",
"/consume/batch_q",
Some(json!({ "consumer_id": "worker-1", "timeout_seconds": 3600 })),
)
.await;
}
let (status, body) = make_request(
api.clone(),
"POST",
"/nack-batch/batch_q",
Some(json!({ "consumer_id": "worker-1", "message_ids": ids })),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["unlocked"].as_array().unwrap().len(), 3);
assert_eq!(body["dead_lettered"].as_array().unwrap().len(), 0);
assert_eq!(body["dropped"].as_array().unwrap().len(), 0);
assert_eq!(body["not_found"].as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn test_batch_nack_missing_fields_rejected() {
let (api, _temp_dir) = create_test_api().await;
let (status, _) = make_request(
api.clone(),
"POST",
"/nack-batch/batch_q",
Some(json!({ "message_ids": ["id1"] })),
)
.await;
assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY);
}
#[tokio::test]
async fn test_purge_all_empties_all_queues() {
let (api, _temp_dir) = create_test_api().await;
for q in &["pa_q1", "pa_q2", "pa_q3"] {
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({ "name": q, "config": { "ordering": "MaxFirst" } })),
)
.await;
for i in 0..3 {
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": q, "priority": i, "payload": format!("msg-{}", i) })),
)
.await;
}
}
let (status, body) = make_request(api.clone(), "POST", "/purge-all", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["purged_queues"], 3);
assert_eq!(body["purged_messages"], 9);
let (_, stats) = make_request(api.clone(), "GET", "/stats", None).await;
for q in stats["queues"].as_array().unwrap() {
assert_eq!(q["available"], 0);
assert_eq!(q["total"], 0);
}
}
#[tokio::test]
async fn test_purge_all_empty_server() {
let (api, _temp_dir) = create_test_api().await;
let (status, body) = make_request(api, "POST", "/purge-all", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["purged_queues"], 0);
assert_eq!(body["purged_messages"], 0);
}
#[tokio::test]
async fn test_delete_all_removes_all_queues() {
let (api, _temp_dir) = create_test_api().await;
for q in &["da_q1", "da_q2", "da_q3"] {
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({ "name": q, "config": { "ordering": "MaxFirst" } })),
)
.await;
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": q, "priority": 1, "payload": "msg" })),
)
.await;
}
let (status, body) = make_request(api.clone(), "POST", "/delete-all", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["deleted_queues"], 3);
assert_eq!(body["deleted_messages"], 3);
let (_, queues) = make_request(api.clone(), "GET", "/queues", None).await;
assert_eq!(queues.as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn test_delete_all_empty_server() {
let (api, _temp_dir) = create_test_api().await;
let (status, body) = make_request(api, "POST", "/delete-all", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["deleted_queues"], 0);
assert_eq!(body["deleted_messages"], 0);
}
#[tokio::test]
async fn test_delete_queue_with_dlq_allows_recreate() {
let (api, _temp_dir) = create_test_api().await;
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({ "name": "dlq_q", "config": { "ordering": "MaxFirst" } })),
)
.await;
let (_, pub_body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": "dlq_q", "priority": 1, "payload": "dlq_msg", "max_retries": 0 })),
)
.await;
let msg_id = pub_body["id"].as_str().unwrap();
make_request(
api.clone(),
"POST",
"/consume/dlq_q",
Some(json!({ "consumer_id": "c1", "timeout_seconds": 30 })),
)
.await;
let nack_path = format!("/nack/dlq_q/{}", msg_id);
make_request(
api.clone(),
"POST",
&nack_path,
Some(json!({ "consumer_id": "c1" })),
)
.await;
let (status, _) = make_request(api.clone(), "DELETE", "/delete-queue/dlq_q", None).await;
assert_eq!(status, StatusCode::OK);
let (status, _) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({ "name": "dlq_q", "config": { "ordering": "MaxFirst" } })),
)
.await;
assert_eq!(status, StatusCode::OK);
}
#[tokio::test]
async fn test_delete_all_with_dlq_allows_recreate() {
let (api, _temp_dir) = create_test_api().await;
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({ "name": "da_dlq", "config": { "ordering": "MaxFirst" } })),
)
.await;
let (_, pub_body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": "da_dlq", "priority": 1, "payload": "dlq_msg", "max_retries": 0 })),
)
.await;
let msg_id = pub_body["id"].as_str().unwrap();
make_request(
api.clone(),
"POST",
"/consume/da_dlq",
Some(json!({ "consumer_id": "c1", "timeout_seconds": 30 })),
)
.await;
let nack_path = format!("/nack/da_dlq/{}", msg_id);
make_request(
api.clone(),
"POST",
&nack_path,
Some(json!({ "consumer_id": "c1" })),
)
.await;
let (status, _) = make_request(api.clone(), "POST", "/delete-all", None).await;
assert_eq!(status, StatusCode::OK);
let (status, _) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({ "name": "da_dlq", "config": { "ordering": "MaxFirst" } })),
)
.await;
assert_eq!(status, StatusCode::OK);
}
#[tokio::test]
async fn test_update_queue_empty_name_returns_bad_request() {
let (api, _temp_dir) = create_test_api().await;
let (status, _) = make_request(
api,
"POST",
"/update-queue",
Some(json!({"name": " ", "config": {}})),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_update_queue_rename_to_existing_returns_conflict() {
let (api, _temp_dir) = create_test_api().await;
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({"name": "src-q", "config": {"ordering": "MaxFirst"}})),
)
.await;
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({"name": "dst-q", "config": {"ordering": "MaxFirst"}})),
)
.await;
let (status, _) = make_request(
api,
"POST",
"/update-queue",
Some(json!({"name": "src-q", "config": {"name": "dst-q"}})),
)
.await;
assert_eq!(status, StatusCode::CONFLICT);
}
#[tokio::test]
async fn test_update_queue_rename_to_empty_string_returns_bad_request() {
let (api, _temp_dir) = create_test_api().await;
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({"name": "renameq", "config": {"ordering": "MaxFirst"}})),
)
.await;
let (status, _) = make_request(
api,
"POST",
"/update-queue",
Some(json!({"name": "renameq", "config": {"name": ""}})),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_text_priority_ordering_through_api() {
let (api, _temp_dir) = create_test_api().await;
let (status, _) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({
"name": "text_q",
"config": {
"ordering": "MinFirst",
"priority_kind": "Text"
}
})),
)
.await;
assert_eq!(status, StatusCode::OK);
for (prio, payload) in &[("charlie", "c"), ("alpha", "a"), ("bravo", "b")] {
let (status, _) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({
"queue": "text_q",
"priority": prio,
"payload": payload
})),
)
.await;
assert_eq!(status, StatusCode::OK);
}
for expected in &["a", "b", "c"] {
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/text_q",
Some(json!({"consumer_id": "w", "timeout_seconds": 30})),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"].as_str().unwrap(), *expected);
}
}
#[tokio::test]
async fn test_text_priority_kind_mismatch_through_api() {
let (api, _temp_dir) = create_test_api().await;
let (status, _) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({
"name": "tq",
"config": {
"ordering": "MinFirst",
"priority_kind": "Text"
}
})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({
"queue": "tq",
"priority": 42,
"payload": "nope"
})),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_text_priority_with_slash_succeeds() {
let (api, _temp_dir) = create_test_api().await;
let (status, _) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({
"name": "slash_q",
"config": {
"ordering": "MinFirst",
"priority_kind": "Text"
}
})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({
"queue": "slash_q",
"priority": "/usr/local/bin",
"payload": "path_payload"
})),
)
.await;
assert_eq!(status, StatusCode::OK);
assert!(
body["id"].as_str().is_some(),
"Expected message id in response"
);
let (status, body) = make_request(
api.clone(),
"POST",
"/consume/slash_q",
Some(json!({
"consumer_id": "c1",
"timeout": 30
})),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["payload"], "path_payload");
let (status, body) = make_request(api.clone(), "GET", "/queue-stats/slash_q", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["total"], 1);
}
#[tokio::test]
async fn test_text_priority_empty_returns_400() {
let (api, _temp_dir) = create_test_api().await;
let (status, _) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({
"name": "empty_q",
"config": {
"ordering": "MinFirst",
"priority_kind": "Text"
}
})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({
"queue": "empty_q",
"priority": "",
"payload": "test"
})),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let error_msg = body["error"].as_str().unwrap();
assert!(
error_msg.contains("Text priority must not be empty"),
"Expected validation error, got: {}",
error_msg
);
}
#[tokio::test]
async fn test_duplicate_payload_returns_400() {
let (api, _temp_dir) = create_test_api().await;
let (status, _) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({
"name": "nodup_q",
"config": {
"ordering": "MaxFirst",
"allow_duplicates": false
}
})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({
"queue": "nodup_q",
"priority": 1,
"payload": "same"
})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({
"queue": "nodup_q",
"priority": 2,
"payload": "same"
})),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let error_msg = body["error"].as_str().unwrap();
assert!(
error_msg.contains("Duplicate payload rejected"),
"Expected duplicate error, got: {}",
error_msg
);
}
#[tokio::test]
async fn test_operation_timings_returns_200_when_idle() {
let (api, _dir) = create_test_api().await;
let (status, body) = make_request(api, "GET", "/operation-timings", None).await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object(), "expected JSON object, got: {body}");
}
#[tokio::test]
async fn test_operation_timings_default_window() {
let (api, _dir) = create_test_api().await;
let (status, _) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({"queue":"t","priority":1,"payload":"x"})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, body) = make_request(api, "GET", "/operation-timings", None).await;
assert_eq!(status, StatusCode::OK);
let obj = body.as_object().unwrap();
assert!(
obj.contains_key("publish"),
"missing 'publish' key: {obj:?}"
);
assert!(
obj.contains_key("storage_push"),
"missing 'storage_push' key: {obj:?}"
);
let buckets = obj["publish"].as_array().unwrap();
assert!(!buckets.is_empty());
let b = &buckets[0];
assert!(b.get("t_epoch_s").is_some());
assert!(b.get("count").is_some());
assert!(b.get("total_us").is_some());
assert!(b.get("min_us").is_some());
assert!(b.get("max_us").is_some());
}
#[tokio::test]
async fn test_operation_timings_custom_seconds() {
let (api, _dir) = create_test_api().await;
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({"queue":"t","priority":1,"payload":"x"})),
)
.await;
let (status, body) = make_request(api, "GET", "/operation-timings?seconds=10", None).await;
assert_eq!(status, StatusCode::OK);
let obj = body.as_object().unwrap();
if let Some(buckets) = obj.get("publish").and_then(|v| v.as_array()) {
assert!(
buckets.len() <= 10,
"expected <=10 buckets, got {}",
buckets.len()
);
}
}
#[tokio::test]
async fn test_operation_timings_covers_all_api_operations() {
let (api, _dir) = create_test_api().await;
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({"name":"ops","ordering":"MaxFirst"})),
)
.await;
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({"queue":"ops","priority":1,"payload":"p1"})),
)
.await;
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({"queue":"ops","priority":2,"payload":"p2"})),
)
.await;
let (status, consume_body) = make_request(
api.clone(),
"POST",
"/consume/ops",
Some(json!({"consumer_id":"c1","timeout_seconds":30})),
)
.await;
assert_eq!(status, StatusCode::OK);
let msg_id = consume_body["id"].as_str().unwrap().to_string();
let (status, _) = make_request(
api.clone(),
"POST",
&format!("/ack/ops/{msg_id}"),
Some(json!({"consumer_id":"c1"})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (_, consume_body2) = make_request(
api.clone(),
"POST",
"/consume/ops",
Some(json!({"consumer_id":"c1","timeout_seconds":30})),
)
.await;
let msg_id2 = consume_body2["id"].as_str().unwrap().to_string();
make_request(
api.clone(),
"POST",
&format!("/nack/ops/{msg_id2}"),
Some(json!({"consumer_id":"c1"})),
)
.await;
make_request(api.clone(), "GET", "/queue-stats/ops", None).await;
let (status, body) = make_request(api, "GET", "/operation-timings?seconds=60", None).await;
assert_eq!(status, StatusCode::OK);
let obj = body.as_object().unwrap();
let expected_ops = [
"publish",
"consume",
"ack",
"nack",
"queue_stats",
"storage_push",
"storage_pop",
"storage_ack",
"storage_nack",
"storage_get_queue_stats",
];
for op in &expected_ops {
assert!(
obj.contains_key(*op),
"missing operation '{}' in timings: {:?}",
op,
obj.keys().collect::<Vec<_>>()
);
let buckets = obj[*op].as_array().unwrap();
let total_count: u64 = buckets.iter().map(|b| b["count"].as_u64().unwrap()).sum();
assert!(
total_count > 0,
"operation '{}' should have at least 1 sample, got 0",
op
);
}
}
#[tokio::test]
async fn test_operation_timings_responsive_without_storage() {
let (api, _dir) = create_test_api().await;
let start = std::time::Instant::now();
let (status, _) = make_request(api, "GET", "/operation-timings?seconds=3600", None).await;
let elapsed = start.elapsed();
assert_eq!(status, StatusCode::OK);
assert!(
elapsed.as_millis() < 500,
"endpoint took too long: {:?}",
elapsed
);
}
#[tokio::test]
async fn test_force_unlock_queue_endpoint() {
let (api, _temp_dir) = create_test_api().await;
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({ "name": "fu_api_q", "config": { "ordering": "MaxFirst" } })),
)
.await;
for i in 0..3 {
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": "fu_api_q", "priority": i, "payload": format!("msg-{}", i) })),
)
.await;
}
for _ in 0..3 {
make_request(
api.clone(),
"POST",
"/consume/fu_api_q",
Some(json!({ "consumer_id": "test", "timeout_seconds": 300 })),
)
.await;
}
let (_, stats) = make_request(api.clone(), "GET", "/queue-stats/fu_api_q", None).await;
assert_eq!(stats["locked"], 3);
let (status, body) = make_request(api.clone(), "POST", "/force-unlock/fu_api_q", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["queue"], "fu_api_q");
assert_eq!(body["unlocked"], 3);
let (_, stats) = make_request(api.clone(), "GET", "/queue-stats/fu_api_q", None).await;
assert_eq!(stats["locked"], 0);
assert_eq!(stats["available"], 3);
}
#[tokio::test]
async fn test_force_unlock_queue_endpoint_nothing_locked() {
let (api, _temp_dir) = create_test_api().await;
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({ "name": "fu_empty", "config": { "ordering": "MaxFirst" } })),
)
.await;
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({ "queue": "fu_empty", "priority": 1, "payload": "test" })),
)
.await;
let (status, body) = make_request(api.clone(), "POST", "/force-unlock/fu_empty", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["unlocked"], 0);
}
#[tokio::test]
async fn test_stats_includes_memory_fields() {
let (api, _temp_dir) = create_test_api().await;
let (status, body) = make_request(api, "GET", "/stats", None).await;
assert_eq!(status, StatusCode::OK);
let summary = &body["summary"];
assert!(
summary["memory_usage_bytes"].is_number(),
"summary.memory_usage_bytes should be a number, got {:?}",
summary["memory_usage_bytes"]
);
assert!(
summary["memory_limit_bytes"].is_number(),
"summary.memory_limit_bytes should be a number, got {:?}",
summary["memory_limit_bytes"]
);
assert!(
summary["memory_pressure"].is_boolean(),
"summary.memory_pressure should be a boolean, got {:?}",
summary["memory_pressure"]
);
}