qrusty 0.20.8

A trusty priority queue server built with Rust
Documentation
// tests/api_error_tests.rs

//! Targeted API tests that force storage-layer errors to ensure
//! handler error branches return the expected HTTP status codes.

use axum::{
    body::Body,
    http::{Request, StatusCode},
};
use qrusty::api::{ApiServer, StorageApi};
use qrusty::message::{Message, QueueConfig, QueueStats};
use serde_json::Value;
use std::sync::Arc;
use tower::ServiceExt; // for `oneshot`

#[derive(Clone, Default)]
struct FailingStorage {
    delete_queue_error: bool,
    purge_queue_error: bool,
    force_unlock_queue_error: bool,
    list_queues_error: bool,
    get_all_queue_stats_error: bool,
    get_queue_stats_error: bool,
}

#[async_trait::async_trait]
impl StorageApi for FailingStorage {
    async fn queue_exists(&self, _queue_name: &str) -> anyhow::Result<bool> {
        Ok(false)
    }

    async fn create_queue(&self, _queue_name: &str, _config: QueueConfig) {}

    async fn rename_queue(
        &self,
        _from: &str,
        _to: &str,
    ) -> std::result::Result<(), qrusty::storage::RenameQueueError> {
        Ok(())
    }

    async fn push(&self, _msg: Message) -> anyhow::Result<String> {
        Ok("id".to_string())
    }

    async fn pop(
        &self,
        _queue: &str,
        _consumer_id: &str,
        _timeout_secs: u64,
    ) -> anyhow::Result<Option<Message>> {
        Ok(None)
    }

    async fn ack(
        &self,
        _queue: &str,
        _message_id: &str,
        _consumer_id: &str,
    ) -> anyhow::Result<bool> {
        Ok(false)
    }

    async fn nack(
        &self,
        _queue: &str,
        _message_id: &str,
        _consumer_id: &str,
    ) -> anyhow::Result<bool> {
        Ok(false)
    }

    async fn batch_ack(
        &self,
        _queue: &str,
        _consumer_id: &str,
        _message_ids: &[String],
    ) -> anyhow::Result<qrusty::message::BatchAckResult> {
        Ok(qrusty::message::BatchAckResult::default())
    }

    async fn batch_nack(
        &self,
        _queue: &str,
        _consumer_id: &str,
        _message_ids: &[String],
    ) -> anyhow::Result<qrusty::message::BatchNackResult> {
        Ok(qrusty::message::BatchNackResult::default())
    }

    async fn renew(
        &self,
        _queue: &str,
        _message_id: &str,
        _consumer_id: &str,
        _timeout_secs: u64,
    ) -> anyhow::Result<bool> {
        Ok(false)
    }

    async fn delete_queue(&self, _queue_name: &str) -> anyhow::Result<usize> {
        if self.delete_queue_error {
            return Err(anyhow::anyhow!("delete_queue failed"));
        }
        Ok(0)
    }

    async fn purge_queue(&self, _queue_name: &str) -> anyhow::Result<usize> {
        if self.purge_queue_error {
            return Err(anyhow::anyhow!("purge_queue failed"));
        }
        Ok(0)
    }

    async fn get_all_queue_stats(&self) -> anyhow::Result<Vec<QueueStats>> {
        if self.get_all_queue_stats_error {
            return Err(anyhow::anyhow!("get_all_queue_stats failed"));
        }
        Ok(Vec::new())
    }

    async fn get_queue_stats(&self, queue_name: &str) -> anyhow::Result<QueueStats> {
        if self.get_queue_stats_error {
            return Err(anyhow::anyhow!("get_queue_stats failed"));
        }
        Ok(QueueStats {
            name: queue_name.to_string(),
            available: 0,
            locked: 0,
            total: 0,
            config: QueueConfig::default(),
        })
    }

    async fn list_queues(&self) -> anyhow::Result<Vec<String>> {
        if self.list_queues_error {
            return Err(anyhow::anyhow!("list_queues failed"));
        }
        Ok(Vec::new())
    }

    async fn unlock_expired_messages(&self) -> anyhow::Result<usize> {
        Ok(0)
    }

    async fn force_unlock_queue(&self, _queue_name: &str) -> anyhow::Result<usize> {
        if self.force_unlock_queue_error {
            anyhow::bail!("force_unlock_queue error");
        }
        Ok(0)
    }
}

async fn make_request(api: ApiServer, method: &str, path: &str) -> (StatusCode, Value) {
    let app = api.router();

    let request = Request::builder()
        .method(method)
        .uri(path)
        .body(Body::empty())
        .unwrap();

    let response = app.oneshot(request).await.unwrap();
    let status = response.status();

    let body = axum::body::to_bytes(response.into_body(), usize::MAX)
        .await
        .unwrap();
    let body_str = String::from_utf8(body.to_vec()).unwrap();

    let json_body: Value = if body_str.is_empty() {
        Value::Null
    } else {
        serde_json::from_str(&body_str).unwrap_or(Value::String(body_str))
    };

    (status, json_body)
}

#[tokio::test]
async fn test_delete_queue_error_returns_500_with_json_error() {
    let storage = Arc::new(FailingStorage {
        delete_queue_error: true,
        ..Default::default()
    });
    let api = ApiServer::new(storage);

    let (status, body) = make_request(api, "DELETE", "/delete-queue/somequeue").await;
    assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
    assert!(body["error"]
        .as_str()
        .unwrap()
        .contains("Failed to delete queue"));
}

#[tokio::test]
async fn test_purge_queue_error_returns_500_with_json_error() {
    let storage = Arc::new(FailingStorage {
        purge_queue_error: true,
        ..Default::default()
    });
    let api = ApiServer::new(storage);

    let (status, body) = make_request(api, "POST", "/purge-queue/somequeue").await;
    assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
    assert!(body["error"]
        .as_str()
        .unwrap()
        .contains("Failed to purge queue"));
}

#[tokio::test]
async fn test_queue_stats_storage_error_returns_404() {
    let storage = Arc::new(FailingStorage {
        get_queue_stats_error: true,
        ..Default::default()
    });
    let api = ApiServer::new(storage);

    let (status, _body) = make_request(api, "GET", "/queue-stats/somequeue").await;
    assert_eq!(status, StatusCode::NOT_FOUND);
}

#[tokio::test]
async fn test_queues_storage_error_returns_500() {
    let storage = Arc::new(FailingStorage {
        list_queues_error: true,
        ..Default::default()
    });
    let api = ApiServer::new(storage);

    let (status, _body) = make_request(api, "GET", "/queues").await;
    assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
}

#[tokio::test]
async fn test_stats_storage_error_returns_500() {
    let storage = Arc::new(FailingStorage {
        get_all_queue_stats_error: true,
        ..Default::default()
    });
    let api = ApiServer::new(storage);

    let (status, _body) = make_request(api, "GET", "/stats").await;
    assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
}

/// Verifies: API-0014 — force-unlock returns 500 on storage error.
#[tokio::test]
async fn test_force_unlock_queue_error_returns_500() {
    let storage = Arc::new(FailingStorage {
        force_unlock_queue_error: true,
        ..Default::default()
    });
    let api = ApiServer::new(storage);

    let (status, body) = make_request(api, "POST", "/force-unlock/some_queue").await;
    assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
    assert!(body["error"].as_str().unwrap().contains("force-unlock"));
}