rabbitmq_http_client 0.88.0

RabbitMQ HTTP API client
Documentation
// Copyright (C) 2023-2025 RabbitMQ Core Team (teamrabbitmq@gmail.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rabbitmq_http_client::{api::Client, commons::QueueType, requests::QueueParams};
use serde_json::{Map, Value, json};

use crate::test_helpers::{PASSWORD, USERNAME, async_rabbitmq_version_is_at_least, endpoint};

#[tokio::test]
async fn test_async_declare_and_redeclare_a_classic_queue() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.cq.69373293479827";

    let _ = rc.delete_queue(vhost, name, false).await;

    let result1 = rc.get_queue_info(vhost, name).await;
    assert!(result1.is_err());

    let mut map = Map::<String, Value>::new();
    map.insert("x-max-length".to_owned(), json!(10_000));
    // note: x-queue-type will be injected by QueueParams::new_durable_classic_queue
    let optional_args = Some(map);
    let params = QueueParams::new_durable_classic_queue(name, optional_args.clone());
    let result2 = rc.declare_queue(vhost, &params).await;
    assert!(result2.is_ok(), "declare_queue returned {result2:?}");

    let params2 = QueueParams::new(name, QueueType::Classic, true, false, optional_args.clone());
    let result3 = rc.declare_queue(vhost, &params2).await;
    assert!(result3.is_ok(), "declare_queue returned {result3:?}");

    let _ = rc.delete_queue(vhost, name, false).await;
}

#[tokio::test]
async fn test_async_declare_a_quorum_queue() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.qq.182374982374";

    let _ = rc.delete_queue(vhost, name, false).await;

    let result1 = rc.get_queue_info(vhost, name).await;
    assert!(result1.is_err());

    let mut map = Map::<String, Value>::new();
    map.insert("x-max-length".to_owned(), json!(10_000));
    let optional_args = Some(map);
    let params = QueueParams::new_quorum_queue(name, optional_args);
    let result2 = rc.declare_queue(vhost, &params).await;
    assert!(result2.is_ok(), "declare_queue returned {result2:?}");

    let _ = rc.delete_queue(vhost, name, false).await;
}

#[tokio::test]
async fn test_async_declare_a_stream_with_declare_queue() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.qq.927348926347988623";

    let _ = rc.delete_queue(vhost, name, false).await;

    let result1 = rc.get_queue_info(vhost, name).await;
    assert!(result1.is_err());

    let mut map = Map::<String, Value>::new();
    map.insert("x-max-length-bytes".to_owned(), json!(10_000_000));
    let optional_args = Some(map);
    let params = QueueParams::new_stream(name, optional_args);
    let result2 = rc.declare_queue(vhost, &params).await;
    assert!(result2.is_ok(), "declare_queue returned {result2:?}");

    let _ = rc.delete_queue(vhost, name, false).await;
}

#[tokio::test]
async fn test_async_delete_queue() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.cq.982734982364982364896";

    let _ = rc.delete_queue(vhost, name, false).await;

    let result1 = rc.get_queue_info(vhost, name).await;
    assert!(result1.is_err());

    let params = QueueParams::new_durable_classic_queue(name, None);
    let result2 = rc.declare_queue(vhost, &params).await;
    assert!(result2.is_ok(), "declare_queue returned {result2:?}");

    rc.delete_queue(vhost, name, false).await.unwrap();

    // idempotent delete should succeed
    rc.delete_queue(vhost, name, true).await.unwrap();

    // non-idempotent delete should fail
    assert!(rc.delete_queue(vhost, name, false).await.is_err());
    let result3 = rc.get_queue_info(vhost, name).await;
    assert!(result3.is_err());
}

#[tokio::test]
async fn test_async_list_all_queues() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);

    let vh_name = "/";

    let params = QueueParams::new_durable_classic_queue("rust.tests.cq.23487866", None);
    let result1 = rc.declare_queue(vh_name, &params).await;
    assert!(result1.is_ok(), "declare_queue returned {result1:?}");

    crate::test_helpers::async_await_queue_metric_emission().await;

    let result2 = rc.list_queues().await;
    assert!(result2.is_ok(), "list_queues returned {result2:?}");

    rc.delete_queue(vh_name, params.name, false).await.unwrap();
}

#[tokio::test]
async fn test_async_list_queues_in_a_virtual_host() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);

    let vh_name = "/";

    let params = QueueParams::new_durable_classic_queue("rust.tests.cq.64692734867", None);
    let result1 = rc.declare_queue(vh_name, &params).await;
    assert!(result1.is_ok(), "declare_queue returned {result1:?}");

    crate::test_helpers::async_await_queue_metric_emission().await;

    let result2 = rc.list_queues_in(vh_name).await;
    assert!(result2.is_ok(), "list_queues_in returned {result2:?}");

    rc.delete_queue(vh_name, params.name, false).await.unwrap();
}

#[tokio::test]
async fn test_async_list_queues_with_details() {
    // /api/queues/detailed endpoint was added in RabbitMQ 3.13
    if !async_rabbitmq_version_is_at_least(3, 13, 0).await {
        return;
    }

    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);

    let vh_name = "/";

    let params = QueueParams::new_durable_classic_queue("rust.tests.cq.detailed.92734827364", None);
    let result1 = rc.declare_queue(vh_name, &params).await;
    assert!(result1.is_ok(), "declare_queue returned {result1:?}");

    crate::test_helpers::async_await_queue_metric_emission().await;

    let result2 = rc.list_queues_with_details().await;
    assert!(
        result2.is_ok(),
        "list_queues_with_details returned {result2:?}"
    );

    let detailed_queues = result2.unwrap();
    assert!(
        !detailed_queues.is_empty(),
        "Expected at least one queue in detailed list"
    );

    // Find our test queue in the results
    let test_queue = detailed_queues.iter().find(|q| q.name == params.name);
    assert!(
        test_queue.is_some(),
        "Expected to find our test queue in detailed results"
    );

    let queue = test_queue.unwrap();
    // Verify basic queue properties are present
    assert_eq!(queue.name, params.name);
    assert_eq!(queue.vhost, vh_name);
    assert_eq!(queue.durable, true);

    // More fields
    if let Some(gc) = &queue.garbage_collection {
        assert!(gc.fullsweep_after > 1000);
    }

    rc.delete_queue(vh_name, params.name, false).await.unwrap();
}