use axum::{
body::Body,
http::{Request, StatusCode},
};
use qrusty::api::{ApiServer, StorageApi};
use qrusty::message::{Message, QueueConfig, QueueStats};
use serde_json::Value;
use std::sync::Arc;
use tower::ServiceExt;
#[derive(Clone, Default)]
struct FailingStorage {
delete_queue_error: bool,
purge_queue_error: bool,
force_unlock_queue_error: bool,
list_queues_error: bool,
get_all_queue_stats_error: bool,
get_queue_stats_error: bool,
}
#[async_trait::async_trait]
impl StorageApi for FailingStorage {
async fn queue_exists(&self, _queue_name: &str) -> anyhow::Result<bool> {
Ok(false)
}
async fn create_queue(&self, _queue_name: &str, _config: QueueConfig) {}
async fn rename_queue(
&self,
_from: &str,
_to: &str,
) -> std::result::Result<(), qrusty::storage::RenameQueueError> {
Ok(())
}
async fn push(&self, _msg: Message) -> anyhow::Result<String> {
Ok("id".to_string())
}
async fn pop(
&self,
_queue: &str,
_consumer_id: &str,
_timeout_secs: u64,
) -> anyhow::Result<Option<Message>> {
Ok(None)
}
async fn ack(
&self,
_queue: &str,
_message_id: &str,
_consumer_id: &str,
) -> anyhow::Result<bool> {
Ok(false)
}
async fn nack(
&self,
_queue: &str,
_message_id: &str,
_consumer_id: &str,
) -> anyhow::Result<bool> {
Ok(false)
}
async fn nack_with_delay(
&self,
_queue: &str,
_message_id: &str,
_consumer_id: &str,
_delay_secs: u64,
) -> anyhow::Result<bool> {
Ok(false)
}
async fn batch_ack(
&self,
_queue: &str,
_consumer_id: &str,
_message_ids: &[String],
) -> anyhow::Result<qrusty::message::BatchAckResult> {
Ok(qrusty::message::BatchAckResult::default())
}
async fn batch_nack(
&self,
_queue: &str,
_consumer_id: &str,
_message_ids: &[String],
) -> anyhow::Result<qrusty::message::BatchNackResult> {
Ok(qrusty::message::BatchNackResult::default())
}
async fn renew(
&self,
_queue: &str,
_message_id: &str,
_consumer_id: &str,
_timeout_secs: u64,
) -> anyhow::Result<bool> {
Ok(false)
}
async fn delete_queue(&self, _queue_name: &str) -> anyhow::Result<usize> {
if self.delete_queue_error {
return Err(anyhow::anyhow!("delete_queue failed"));
}
Ok(0)
}
async fn purge_queue(&self, _queue_name: &str) -> anyhow::Result<usize> {
if self.purge_queue_error {
return Err(anyhow::anyhow!("purge_queue failed"));
}
Ok(0)
}
async fn get_all_queue_stats(&self) -> anyhow::Result<Vec<QueueStats>> {
if self.get_all_queue_stats_error {
return Err(anyhow::anyhow!("get_all_queue_stats failed"));
}
Ok(Vec::new())
}
async fn get_queue_stats(&self, queue_name: &str) -> anyhow::Result<QueueStats> {
if self.get_queue_stats_error {
return Err(anyhow::anyhow!("get_queue_stats failed"));
}
Ok(QueueStats {
name: queue_name.to_string(),
available: 0,
locked: 0,
total: 0,
config: QueueConfig::default(),
})
}
async fn list_queues(&self) -> anyhow::Result<Vec<String>> {
if self.list_queues_error {
return Err(anyhow::anyhow!("list_queues failed"));
}
Ok(Vec::new())
}
async fn unlock_expired_messages(&self) -> anyhow::Result<usize> {
Ok(0)
}
async fn force_unlock_queue(&self, _queue_name: &str) -> anyhow::Result<usize> {
if self.force_unlock_queue_error {
anyhow::bail!("force_unlock_queue error");
}
Ok(0)
}
}
async fn make_request(api: ApiServer, method: &str, path: &str) -> (StatusCode, Value) {
let app = api.router();
let request = Request::builder()
.method(method)
.uri(path)
.body(Body::empty())
.unwrap();
let response = app.oneshot(request).await.unwrap();
let status = response.status();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let body_str = String::from_utf8(body.to_vec()).unwrap();
let json_body: Value = if body_str.is_empty() {
Value::Null
} else {
serde_json::from_str(&body_str).unwrap_or(Value::String(body_str))
};
(status, json_body)
}
#[tokio::test]
async fn test_delete_queue_error_returns_500_with_json_error() {
let storage = Arc::new(FailingStorage {
delete_queue_error: true,
..Default::default()
});
let api = ApiServer::new(storage);
let (status, body) = make_request(api, "DELETE", "/delete-queue/somequeue").await;
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
assert!(body["error"]
.as_str()
.unwrap()
.contains("Failed to delete queue"));
}
#[tokio::test]
async fn test_purge_queue_error_returns_500_with_json_error() {
let storage = Arc::new(FailingStorage {
purge_queue_error: true,
..Default::default()
});
let api = ApiServer::new(storage);
let (status, body) = make_request(api, "POST", "/purge-queue/somequeue").await;
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
assert!(body["error"]
.as_str()
.unwrap()
.contains("Failed to purge queue"));
}
#[tokio::test]
async fn test_queue_stats_storage_error_returns_404() {
let storage = Arc::new(FailingStorage {
get_queue_stats_error: true,
..Default::default()
});
let api = ApiServer::new(storage);
let (status, _body) = make_request(api, "GET", "/queue-stats/somequeue").await;
assert_eq!(status, StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_queues_storage_error_returns_500() {
let storage = Arc::new(FailingStorage {
list_queues_error: true,
..Default::default()
});
let api = ApiServer::new(storage);
let (status, _body) = make_request(api, "GET", "/queues").await;
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
}
#[tokio::test]
async fn test_stats_storage_error_returns_500() {
let storage = Arc::new(FailingStorage {
get_all_queue_stats_error: true,
..Default::default()
});
let api = ApiServer::new(storage);
let (status, _body) = make_request(api, "GET", "/stats").await;
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
}
#[tokio::test]
async fn test_force_unlock_queue_error_returns_500() {
let storage = Arc::new(FailingStorage {
force_unlock_queue_error: true,
..Default::default()
});
let api = ApiServer::new(storage);
let (status, body) = make_request(api, "POST", "/force-unlock/some_queue").await;
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
assert!(body["error"].as_str().unwrap().contains("force-unlock"));
}