use axum::body::Body;
use axum::http::{Request, StatusCode};
use qrusty::api::ApiServer;
use qrusty::memory_storage::MemoryStorage;
use serde_json::{json, Value};
use std::sync::Arc;
use tower::ServiceExt;
fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_test_writer()
.try_init();
}
async fn make_request(
api: ApiServer,
method: &str,
path: &str,
body: Option<Value>,
) -> (StatusCode, Value) {
let app = api.router();
let request_builder = Request::builder()
.method(method)
.uri(path)
.header("content-type", "application/json");
let request = if let Some(body_json) = body {
request_builder
.body(Body::from(body_json.to_string()))
.unwrap()
} else {
request_builder.body(Body::empty()).unwrap()
};
let response = app.oneshot(request).await.unwrap();
let status = response.status();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8(body.to_vec()).unwrap();
let json_body: Value = if body_str.is_empty() {
Value::Null
} else {
serde_json::from_str(&body_str).unwrap_or(Value::String(body_str))
};
(status, json_body)
}
#[tokio::test]
async fn test_api_with_memory_storage_publish_consume_ack() {
init_tracing();
let storage = Arc::new(MemoryStorage::new());
let api = ApiServer::new(storage);
let (status, _) = make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({"name": "test_q", "config": {"ordering": "MaxFirst"}})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, publish_body) = make_request(
api.clone(),
"POST",
"/publish",
Some(json!({
"queue": "test_q",
"priority": 100,
"payload": "{\"order_id\": 1}",
"max_retries": 3
})),
)
.await;
assert_eq!(status, StatusCode::OK);
let message_id = publish_body["id"].as_str().unwrap().to_string();
let (status, consume_body) = make_request(
api.clone(),
"POST",
"/consume/test_q",
Some(json!({"consumer_id": "worker-1", "timeout_seconds": 30})),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(consume_body["id"].as_str().unwrap(), &message_id);
let (status, _) = make_request(
api.clone(),
"POST",
&format!("/ack/test_q/{}", message_id),
Some(json!({"consumer_id": "worker-1"})),
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, stats) = make_request(api.clone(), "GET", "/queue-stats/test_q", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(stats["total"].as_u64().unwrap(), 0);
}
#[tokio::test]
async fn test_api_with_memory_storage_health() {
init_tracing();
let storage = Arc::new(MemoryStorage::new());
let api = ApiServer::new(storage);
let (status, body) = make_request(api, "GET", "/health", None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["status"].as_str().unwrap(), "ok");
}
#[tokio::test]
async fn test_api_with_memory_storage_stats() {
init_tracing();
let storage = Arc::new(MemoryStorage::new());
let api = ApiServer::new(storage);
make_request(
api.clone(),
"POST",
"/create-queue",
Some(json!({"name": "q1", "config": {"ordering": "MaxFirst"}})),
)
.await;
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({"queue": "q1", "priority": 1, "payload": "a", "max_retries": 3})),
)
.await;
make_request(
api.clone(),
"POST",
"/publish",
Some(json!({"queue": "q1", "priority": 2, "payload": "b", "max_retries": 3})),
)
.await;
let (status, stats) = make_request(api.clone(), "GET", "/stats", None).await;
assert_eq!(status, StatusCode::OK);
let queues = stats["queues"].as_array().unwrap();
assert_eq!(queues.len(), 1);
assert_eq!(queues[0]["name"].as_str().unwrap(), "q1");
assert_eq!(queues[0]["total"].as_u64().unwrap(), 2);
}
#[tokio::test]
async fn test_timeout_monitor_with_memory_storage() {
use qrusty::api::StorageApi;
use tokio::time::Duration;
init_tracing();
let storage = Arc::new(MemoryStorage::new());
let msg = qrusty::message::Message {
id: uuid::Uuid::new_v4().to_string(),
queue: "monitor_q".to_string(),
priority: qrusty::message::Priority::Numeric(1),
payload: "test".to_string(),
created_at: chrono::Utc::now(),
locked_until: None,
locked_by: None,
retry_count: 0,
max_retries: 3,
payload_ref: None,
payload_hash: None,
};
let msg_id = msg.id.clone();
storage.push(msg).await.unwrap();
storage.pop("monitor_q", "c1", 1).await.unwrap().unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let unlocked = storage.unlock_expired_messages().await.unwrap();
assert_eq!(unlocked, 1);
let recovered = storage.pop("monitor_q", "c2", 30).await.unwrap().unwrap();
assert_eq!(recovered.id, msg_id);
}