qrusty 0.20.7

A trusty priority queue server built with Rust
Documentation
// tests/memory_integration_tests.rs
// Verifies: SYS-0013, PER-0011

//! Integration tests exercising the full API stack with MemoryStorage backend.

use axum::body::Body;
use axum::http::{Request, StatusCode};
use qrusty::api::ApiServer;
use qrusty::memory_storage::MemoryStorage;
use serde_json::{json, Value};
use std::sync::Arc;
use tower::ServiceExt;

fn init_tracing() {
    let _ = tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .with_test_writer()
        .try_init();
}

async fn make_request(
    api: ApiServer,
    method: &str,
    path: &str,
    body: Option<Value>,
) -> (StatusCode, Value) {
    let app = api.router();

    let request_builder = Request::builder()
        .method(method)
        .uri(path)
        .header("content-type", "application/json");

    let request = if let Some(body_json) = body {
        request_builder
            .body(Body::from(body_json.to_string()))
            .unwrap()
    } else {
        request_builder.body(Body::empty()).unwrap()
    };

    let response = app.oneshot(request).await.unwrap();
    let status = response.status();

    let body = axum::body::to_bytes(response.into_body(), usize::MAX)
        .await
        .unwrap();
    let body_str = String::from_utf8(body.to_vec()).unwrap();

    let json_body: Value = if body_str.is_empty() {
        Value::Null
    } else {
        serde_json::from_str(&body_str).unwrap_or(Value::String(body_str))
    };

    (status, json_body)
}

#[tokio::test]
async fn test_api_with_memory_storage_publish_consume_ack() {
    init_tracing();
    let storage = Arc::new(MemoryStorage::new());
    let api = ApiServer::new(storage);

    // Create a queue
    let (status, _) = make_request(
        api.clone(),
        "POST",
        "/create-queue",
        Some(json!({"name": "test_q", "config": {"ordering": "MaxFirst"}})),
    )
    .await;
    assert_eq!(status, StatusCode::OK);

    // Publish a message
    let (status, publish_body) = make_request(
        api.clone(),
        "POST",
        "/publish",
        Some(json!({
            "queue": "test_q",
            "priority": 100,
            "payload": "{\"order_id\": 1}",
            "max_retries": 3
        })),
    )
    .await;
    assert_eq!(status, StatusCode::OK);
    let message_id = publish_body["id"].as_str().unwrap().to_string();

    // Consume the message
    let (status, consume_body) = make_request(
        api.clone(),
        "POST",
        "/consume/test_q",
        Some(json!({"consumer_id": "worker-1", "timeout_seconds": 30})),
    )
    .await;
    assert_eq!(status, StatusCode::OK);
    assert_eq!(consume_body["id"].as_str().unwrap(), &message_id);

    // Ack the message
    let (status, _) = make_request(
        api.clone(),
        "POST",
        &format!("/ack/test_q/{}", message_id),
        Some(json!({"consumer_id": "worker-1"})),
    )
    .await;
    assert_eq!(status, StatusCode::OK);

    // Stats should show empty queue
    let (status, stats) = make_request(api.clone(), "GET", "/queue-stats/test_q", None).await;
    assert_eq!(status, StatusCode::OK);
    assert_eq!(stats["total"].as_u64().unwrap(), 0);
}

#[tokio::test]
async fn test_api_with_memory_storage_health() {
    init_tracing();
    let storage = Arc::new(MemoryStorage::new());
    let api = ApiServer::new(storage);

    let (status, body) = make_request(api, "GET", "/health", None).await;
    assert_eq!(status, StatusCode::OK);
    assert_eq!(body["status"].as_str().unwrap(), "ok");
}

#[tokio::test]
async fn test_api_with_memory_storage_stats() {
    init_tracing();
    let storage = Arc::new(MemoryStorage::new());
    let api = ApiServer::new(storage);

    // Create and populate queues
    make_request(
        api.clone(),
        "POST",
        "/create-queue",
        Some(json!({"name": "q1", "config": {"ordering": "MaxFirst"}})),
    )
    .await;
    make_request(
        api.clone(),
        "POST",
        "/publish",
        Some(json!({"queue": "q1", "priority": 1, "payload": "a", "max_retries": 3})),
    )
    .await;
    make_request(
        api.clone(),
        "POST",
        "/publish",
        Some(json!({"queue": "q1", "priority": 2, "payload": "b", "max_retries": 3})),
    )
    .await;

    let (status, stats) = make_request(api.clone(), "GET", "/stats", None).await;
    assert_eq!(status, StatusCode::OK);

    let queues = stats["queues"].as_array().unwrap();
    assert_eq!(queues.len(), 1);
    assert_eq!(queues[0]["name"].as_str().unwrap(), "q1");
    assert_eq!(queues[0]["total"].as_u64().unwrap(), 2);
}

#[tokio::test]
async fn test_timeout_monitor_with_memory_storage() {
    use qrusty::api::StorageApi;
    use tokio::time::Duration;

    init_tracing();
    let storage = Arc::new(MemoryStorage::new());

    // Push and lock a message with 1s timeout
    let msg = qrusty::message::Message {
        id: uuid::Uuid::new_v4().to_string(),
        queue: "monitor_q".to_string(),
        priority: qrusty::message::Priority::Numeric(1),
        payload: "test".to_string(),
        created_at: chrono::Utc::now(),
        locked_until: None,
        locked_by: None,
        retry_count: 0,
        max_retries: 3,
        payload_ref: None,
        payload_hash: None,
    };
    let msg_id = msg.id.clone();

    storage.push(msg).await.unwrap();
    storage.pop("monitor_q", "c1", 1).await.unwrap().unwrap();

    // Wait for expiry
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Manually call unlock (simulating what timeout_monitor does)
    let unlocked = storage.unlock_expired_messages().await.unwrap();
    assert_eq!(unlocked, 1);

    // Message should be available again
    let recovered = storage.pop("monitor_q", "c2", 30).await.unwrap().unwrap();
    assert_eq!(recovered.id, msg_id);
}