use axum::{
body::Body,
http::{Request, StatusCode},
};
use qrusty::{api::ApiServer, storage::Storage};
use serde_json::{json, Value};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::time::{sleep, Duration};
use tower::ServiceExt;
async fn create_test_system() -> (ApiServer, TempDir) {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let storage =
Storage::new(temp_dir.path().to_str().unwrap()).expect("Failed to create test storage");
let api = ApiServer::new(Arc::new(storage));
(api, temp_dir)
}
async fn make_request(
api: &ApiServer,
method: &str,
path: &str,
body: Option<Value>,
) -> (StatusCode, Value) {
let app = api.router();
let request_builder = Request::builder()
.method(method)
.uri(path)
.header("content-type", "application/json");
let request = if let Some(body_json) = body {
request_builder
.body(Body::from(body_json.to_string()))
.unwrap()
} else {
request_builder.body(Body::empty()).unwrap()
};
let response = app.oneshot(request).await.unwrap();
let status = response.status();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8(body.to_vec()).unwrap();
let json_body: Value = if body_str.is_empty() {
Value::Null
} else {
serde_json::from_str(&body_str).unwrap_or(Value::String(body_str))
};
(status, json_body)
}
#[tokio::test]
async fn test_order_processing_workflow() {
let (api, _temp_dir) = create_test_system().await;
let urgent_order = json!({
"queue": "orders",
"priority": 1000,
"payload": json!({
"order_id": "ORD-001",
"customer_id": "CUST-123",
"items": [
{"sku": "PHONE-X", "quantity": 1, "price": 999.99}
],
"total": 999.99,
"priority": "urgent",
"created_at": "2025-08-19T10:00:00Z"
}).to_string(),
"max_retries": 5
});
let regular_orders = vec![
json!({
"queue": "orders",
"priority": 100,
"payload": json!({
"order_id": "ORD-002",
"customer_id": "CUST-456",
"items": [{"sku": "BOOK-A", "quantity": 2, "price": 29.99}],
"total": 59.98
}).to_string()
}),
json!({
"queue": "orders",
"priority": 100,
"payload": json!({
"order_id": "ORD-003",
"customer_id": "CUST-789",
"items": [{"sku": "SHIRT-B", "quantity": 1, "price": 39.99}],
"total": 39.99
}).to_string()
}),
];
let (status, _) = make_request(&api, "POST", "/publish", Some(urgent_order)).await;
assert_eq!(status, StatusCode::OK);
for order in regular_orders {
let (status, _) = make_request(&api, "POST", "/publish", Some(order)).await;
assert_eq!(status, StatusCode::OK);
}
let consume_request = json!({
"consumer_id": "order-processor-1",
"timeout_seconds": 60
});
let (status, body) = make_request(
&api,
"POST",
"/consume/orders",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
let first_order: Value = serde_json::from_str(body["payload"].as_str().unwrap()).unwrap();
assert_eq!(first_order["order_id"], "ORD-001"); assert_eq!(first_order["priority"], "urgent");
let ack_request = json!({"consumer_id": "order-processor-1"});
let ack_path = format!("/ack/orders/{}", body["id"].as_str().unwrap());
let (status, _) = make_request(&api, "POST", &ack_path, Some(ack_request)).await;
assert_eq!(status, StatusCode::OK);
for _ in 0..2 {
let (status, body) = make_request(
&api,
"POST",
"/consume/orders",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object());
let ack_request = json!({"consumer_id": "order-processor-1"});
let ack_path = format!("/ack/orders/{}", body["id"].as_str().unwrap());
let (status, _) = make_request(&api, "POST", &ack_path, Some(ack_request)).await;
assert_eq!(status, StatusCode::OK);
}
let (status, body) = make_request(&api, "POST", "/consume/orders", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::Null);
}
#[tokio::test]
async fn test_failed_message_processing() {
let (api, _temp_dir) = create_test_system().await;
let problematic_message = json!({
"queue": "processing",
"priority": 100,
"payload": json!({
"task": "process_payment",
"payment_id": "PAY-001",
"amount": 100.00,
"card_token": "invalid_token"
}).to_string(),
"max_retries": 2
});
let (status, publish_body) =
make_request(&api, "POST", "/publish", Some(problematic_message)).await;
assert_eq!(status, StatusCode::OK);
let message_id = publish_body["id"].as_str().unwrap();
let consume_request = json!({
"consumer_id": "payment-processor-1",
"timeout_seconds": 30
});
let (status, consume_body) = make_request(
&api,
"POST",
"/consume/processing",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(consume_body["retry_count"], 1);
let nack_request = json!({"consumer_id": "payment-processor-1"});
let nack_path = format!("/nack/processing/{}", message_id);
let (status, _) = make_request(&api, "POST", &nack_path, Some(nack_request)).await;
assert_eq!(status, StatusCode::OK);
let (status, consume_body) = make_request(
&api,
"POST",
"/consume/processing",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(consume_body["id"], message_id);
assert_eq!(consume_body["retry_count"], 2);
let nack_request = json!({"consumer_id": "payment-processor-1"});
let nack_path = format!("/nack/processing/{}", message_id);
let (status, _) = make_request(&api, "POST", &nack_path, Some(nack_request)).await;
assert_eq!(status, StatusCode::OK);
let (status, body) =
make_request(&api, "POST", "/consume/processing", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::Null);
}
#[tokio::test]
async fn test_multi_consumer_load_balancing() {
let (api, _temp_dir) = create_test_system().await;
let task_count = 20;
for i in 0..task_count {
let task = json!({
"queue": "tasks",
"priority": 100 - (i % 5), "payload": json!({
"task_id": format!("TASK-{:03}", i),
"type": "data_processing",
"data": format!("data_chunk_{}", i)
}).to_string()
});
let (status, _) = make_request(&api, "POST", "/publish", Some(task)).await;
assert_eq!(status, StatusCode::OK);
}
let worker_count = 4;
let mut handles = vec![];
for worker_id in 0..worker_count {
let api_clone = api.clone();
let handle = tokio::spawn(async move {
let mut processed_tasks = vec![];
let consumer_id = format!("worker-{}", worker_id);
loop {
let consume_request = json!({
"consumer_id": consumer_id,
"timeout_seconds": 30
});
let (status, body) =
make_request(&api_clone, "POST", "/consume/tasks", Some(consume_request)).await;
if status != StatusCode::OK || body == Value::Null {
break;
}
let task_payload: Value =
serde_json::from_str(body["payload"].as_str().unwrap()).unwrap();
processed_tasks.push(task_payload["task_id"].as_str().unwrap().to_string());
sleep(Duration::from_millis(10)).await;
let ack_request = json!({"consumer_id": consumer_id});
let ack_path = format!("/ack/tasks/{}", body["id"].as_str().unwrap());
let (status, _) =
make_request(&api_clone, "POST", &ack_path, Some(ack_request)).await;
assert_eq!(status, StatusCode::OK);
}
processed_tasks
});
handles.push(handle);
}
let mut all_processed_tasks = vec![];
for handle in handles {
let worker_tasks = handle.await.unwrap();
all_processed_tasks.extend(worker_tasks);
}
assert_eq!(all_processed_tasks.len(), task_count);
all_processed_tasks.sort();
for i in 0..task_count {
let expected_task_id = format!("TASK-{:03}", i);
assert!(all_processed_tasks.contains(&expected_task_id));
}
let consume_request = json!({
"consumer_id": "final-check",
"timeout_seconds": 1
});
let (status, body) = make_request(&api, "POST", "/consume/tasks", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::Null);
}
#[tokio::test]
async fn test_mixed_queue_operations() {
let (api, _temp_dir) = create_test_system().await;
for i in 0..10 {
let email = json!({
"queue": "emails",
"priority": 10,
"payload": json!({
"type": "welcome_email",
"user_id": format!("USER-{}", i),
"template": "welcome_template"
}).to_string()
});
make_request(&api, "POST", "/publish", Some(email)).await;
}
for i in 0..5 {
let sms = json!({
"queue": "sms",
"priority": 50,
"payload": json!({
"type": "verification_code",
"phone": format!("+1555000{:04}", i),
"code": format!("{:06}", i * 123456)
}).to_string()
});
make_request(&api, "POST", "/publish", Some(sms)).await;
}
for i in 0..3 {
let alert = json!({
"queue": "alerts",
"priority": 1000,
"payload": json!({
"type": "security_breach",
"severity": "critical",
"incident_id": format!("INC-{}", i)
}).to_string()
});
make_request(&api, "POST", "/publish", Some(alert)).await;
}
for _ in 0..3 {
let consume_request = json!({
"consumer_id": "alert-handler",
"timeout_seconds": 30
});
let (status, body) =
make_request(&api, "POST", "/consume/alerts", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
let payload: Value = serde_json::from_str(body["payload"].as_str().unwrap()).unwrap();
assert_eq!(payload["type"], "security_breach");
let ack_request = json!({"consumer_id": "alert-handler"});
let ack_path = format!("/ack/alerts/{}", body["id"].as_str().unwrap());
make_request(&api, "POST", &ack_path, Some(ack_request)).await;
}
for _ in 0..5 {
let consume_request = json!({
"consumer_id": "sms-sender",
"timeout_seconds": 30
});
let (status, body) =
make_request(&api, "POST", "/consume/sms", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
let payload: Value = serde_json::from_str(body["payload"].as_str().unwrap()).unwrap();
assert_eq!(payload["type"], "verification_code");
let ack_request = json!({"consumer_id": "sms-sender"});
let ack_path = format!("/ack/sms/{}", body["id"].as_str().unwrap());
make_request(&api, "POST", &ack_path, Some(ack_request)).await;
}
for _ in 0..10 {
let consume_request = json!({
"consumer_id": "email-sender",
"timeout_seconds": 30
});
let (status, body) =
make_request(&api, "POST", "/consume/emails", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
let payload: Value = serde_json::from_str(body["payload"].as_str().unwrap()).unwrap();
assert_eq!(payload["type"], "welcome_email");
let ack_request = json!({"consumer_id": "email-sender"});
let ack_path = format!("/ack/emails/{}", body["id"].as_str().unwrap());
make_request(&api, "POST", &ack_path, Some(ack_request)).await;
}
for queue in ["alerts", "sms", "emails"] {
let consume_request = json!({
"consumer_id": "final-check",
"timeout_seconds": 1
});
let (status, body) = make_request(
&api,
"POST",
&format!("/consume/{}", queue),
Some(consume_request),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, Value::Null);
}
}
#[tokio::test]
async fn test_system_health_monitoring() {
let (api, _temp_dir) = create_test_system().await;
for _ in 0..10 {
let (status, body) = make_request(&api, "GET", "/health", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["status"], Value::String("ok".to_string()));
}
for i in 0..100 {
let message = json!({
"queue": "load_test",
"priority": i,
"payload": format!("{{\"load_test\": {}}}", i)
});
make_request(&api, "POST", "/publish", Some(message)).await;
}
let (status, body) = make_request(&api, "GET", "/health", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["status"], Value::String("ok".to_string()));
let (status, body) = make_request(&api, "GET", "/stats", None).await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_object());
assert!(body.get("queues").is_some());
assert!(body.get("summary").is_some());
}
#[tokio::test]
async fn test_data_persistence_across_restarts() {
let temp_dir = TempDir::new().unwrap();
let data_path = temp_dir.path().to_str().unwrap();
let persistent_messages = vec![
("queue1", 100, "message1"),
("queue1", 200, "message2"),
("queue2", 150, "message3"),
];
{
let storage = Storage::new(data_path).unwrap();
let api = ApiServer::new(Arc::new(storage));
for (queue, priority, payload) in &persistent_messages {
let message = json!({
"queue": queue,
"priority": priority,
"payload": json!({"data": payload}).to_string()
});
let (status, _) = make_request(&api, "POST", "/publish", Some(message)).await;
assert_eq!(status, StatusCode::OK);
}
}
{
let storage = Storage::new(data_path).unwrap();
let api = ApiServer::new(Arc::new(storage));
let consume_request = json!({
"consumer_id": "persistence-test",
"timeout_seconds": 30
});
let (status, body) = make_request(
&api,
"POST",
"/consume/queue1",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
let payload: Value = serde_json::from_str(body["payload"].as_str().unwrap()).unwrap();
assert_eq!(payload["data"], "message2");
let (status, body) = make_request(
&api,
"POST",
"/consume/queue1",
Some(consume_request.clone()),
)
.await;
assert_eq!(status, StatusCode::OK);
let payload: Value = serde_json::from_str(body["payload"].as_str().unwrap()).unwrap();
assert_eq!(payload["data"], "message1");
let (status, body) =
make_request(&api, "POST", "/consume/queue2", Some(consume_request)).await;
assert_eq!(status, StatusCode::OK);
let payload: Value = serde_json::from_str(body["payload"].as_str().unwrap()).unwrap();
assert_eq!(payload["data"], "message3"); }
}