rabbitmq_http_client 0.88.0

RabbitMQ HTTP API client
Documentation
// Copyright (C) 2023-2025 RabbitMQ Core Team (teamrabbitmq@gmail.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rabbitmq_http_client::{
    blocking_api::Client,
    commons::{PaginationParams, QueueType},
    requests::QueueParams,
};
use serde_json::{Map, Value, json};

use crate::test_helpers::{PASSWORD, USERNAME, endpoint, rabbitmq_version_is_at_least};

#[test]
fn test_blocking_declare_and_redeclare_a_classic_queue() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.cq.69373293479827";

    let _ = rc.delete_queue(vhost, name, false);

    let result1 = rc.get_queue_info(vhost, name);
    assert!(result1.is_err());

    let mut map = Map::<String, Value>::new();
    map.insert("x-max-length".to_owned(), json!(10_000));
    let optional_args = Some(map);
    let params = QueueParams::new_durable_classic_queue(name, optional_args.clone());
    let result2 = rc.declare_queue(vhost, &params);
    assert!(result2.is_ok(), "declare_queue returned {result2:?}");

    let params2 = QueueParams::new(name, QueueType::Classic, true, false, optional_args.clone());
    let result3 = rc.declare_queue(vhost, &params2);
    assert!(result3.is_ok(), "declare_queue returned {result3:?}");

    let _ = rc.delete_queue(vhost, name, false);
}

#[test]
fn test_blocking_declare_a_quorum_queue() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.qq.182374982374";

    let _ = rc.delete_queue(vhost, name, false);

    let result1 = rc.get_queue_info(vhost, name);
    assert!(result1.is_err());

    let mut map = Map::<String, Value>::new();
    map.insert("x-max-length".to_owned(), json!(10_000));
    let optional_args = Some(map);
    let params = QueueParams::new_quorum_queue(name, optional_args);
    let result2 = rc.declare_queue(vhost, &params);
    assert!(result2.is_ok(), "declare_queue returned {result2:?}");

    let _ = rc.delete_queue(vhost, name, false);
}

#[test]
fn test_blocking_declare_a_stream_with_declare_queue() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.qq.927348926347988623";

    let _ = rc.delete_queue(vhost, name, false);

    let result1 = rc.get_queue_info(vhost, name);
    assert!(result1.is_err());

    let mut map = Map::<String, Value>::new();
    map.insert("x-max-length-bytes".to_owned(), json!(10_000_000));
    let optional_args = Some(map);
    let params = QueueParams::new_stream(name, optional_args);
    let result2 = rc.declare_queue(vhost, &params);
    assert!(result2.is_ok(), "declare_queue returned {result2:?}");

    let _ = rc.delete_queue(vhost, name, false);
}

#[test]
fn test_blocking_delete_queue() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.cq.982734982364982364896";

    let _ = rc.delete_queue(vhost, name, false);

    let result1 = rc.get_queue_info(vhost, name);
    assert!(result1.is_err());

    let params = QueueParams::new_durable_classic_queue(name, None);
    let result2 = rc.declare_queue(vhost, &params);
    assert!(result2.is_ok(), "declare_queue returned {result2:?}");

    rc.delete_queue(vhost, name, false).unwrap();

    // idempotent delete should succeed
    rc.delete_queue(vhost, name, true).unwrap();

    // non-idempotent delete should fail
    assert!(rc.delete_queue(vhost, name, false).is_err());
    let result3 = rc.get_queue_info(vhost, name);
    assert!(result3.is_err());
}

#[test]
fn test_blocking_list_all_queues() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);

    let vh_name = "/";

    let params = QueueParams::new_durable_classic_queue("rust.tests.cq.23487866", None);
    let result1 = rc.declare_queue(vh_name, &params);
    assert!(result1.is_ok(), "declare_queue returned {result1:?}");

    crate::test_helpers::await_queue_metric_emission();

    let result2 = rc.list_queues();
    assert!(result2.is_ok(), "list_queues returned {result2:?}");

    rc.delete_queue(vh_name, params.name, false).unwrap();
}

#[test]
fn test_blocking_list_queues_in_a_virtual_host() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);

    let vh_name = "/";

    let params = QueueParams::new_durable_classic_queue("rust.tests.cq.64692734867", None);
    let result1 = rc.declare_queue(vh_name, &params);
    assert!(result1.is_ok(), "declare_queue returned {result1:?}");

    crate::test_helpers::await_queue_metric_emission();

    let result2 = rc.list_queues_in(vh_name);
    assert!(result2.is_ok(), "list_queues_in returned {result2:?}");

    rc.delete_queue(vh_name, params.name, false).unwrap();
}

#[test]
pub fn test_blocking_list_queues_with_details() {
    // /api/queues/detailed endpoint was added in RabbitMQ 3.13
    if !rabbitmq_version_is_at_least(3, 13, 0) {
        return;
    }

    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);

    let vh_name = "/";

    let params =
        QueueParams::new_durable_classic_queue("rust.tests.cq.detailed.blocking.18273486", None);
    let result1 = rc.declare_queue(vh_name, &params);
    assert!(result1.is_ok(), "declare_queue returned {result1:?}");

    crate::test_helpers::await_queue_metric_emission();

    let result2 = rc.list_queues_with_details();
    assert!(
        result2.is_ok(),
        "list_queues_with_details returned {result2:?}"
    );

    let detailed_queues = result2.unwrap();
    assert!(
        !detailed_queues.is_empty(),
        "Expected at least one queue in detailed list"
    );

    let test_queue = detailed_queues.iter().find(|q| q.name == params.name);
    assert!(
        test_queue.is_some(),
        "Expected to find our test queue in detailed results"
    );

    let queue = test_queue.unwrap();
    assert_eq!(queue.name, params.name);
    assert_eq!(queue.vhost, vh_name);
    assert_eq!(queue.durable, true);
    if let Some(gc) = &queue.garbage_collection {
        assert!(gc.fullsweep_after > 1000);
    }

    rc.delete_queue(vh_name, params.name, false).unwrap();
}

#[test]
fn test_blocking_list_queues_paged() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";

    let params = PaginationParams::first_page(10);
    let result = rc.list_queues_paged(&params);
    assert!(result.is_ok(), "list_queues_paged returned {result:?}");

    let result_in = rc.list_queues_in_paged(vhost, &params);
    assert!(
        result_in.is_ok(),
        "list_queues_in_paged returned {result_in:?}"
    );
}

#[test]
fn test_blocking_list_quorum_queues() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.qq.list_quorum_queues.blocking";

    let _ = rc.delete_queue(vhost, name, true);

    let params = QueueParams::new_quorum_queue(name, None);
    rc.declare_queue(vhost, &params).unwrap();

    crate::test_helpers::await_queue_metric_emission();

    let result = rc.list_quorum_queues();
    assert!(result.is_ok(), "list_quorum_queues returned {result:?}");
    let queues = result.unwrap();
    assert!(queues.iter().any(|q| q.name == name));

    let result_in = rc.list_quorum_queues_in(vhost);
    assert!(
        result_in.is_ok(),
        "list_quorum_queues_in returned {result_in:?}"
    );
    let queues_in = result_in.unwrap();
    assert!(queues_in.iter().any(|q| q.name == name));

    rc.delete_queue(vhost, name, false).unwrap();
}

#[test]
fn test_blocking_list_classic_queues() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.cq.list_classic_queues.blocking";

    let _ = rc.delete_queue(vhost, name, true);

    let params = QueueParams::new_durable_classic_queue(name, None);
    rc.declare_queue(vhost, &params).unwrap();

    crate::test_helpers::await_queue_metric_emission();

    let result = rc.list_classic_queues();
    assert!(result.is_ok(), "list_classic_queues returned {result:?}");
    let queues = result.unwrap();
    assert!(queues.iter().any(|q| q.name == name));

    let result_in = rc.list_classic_queues_in(vhost);
    assert!(
        result_in.is_ok(),
        "list_classic_queues_in returned {result_in:?}"
    );
    let queues_in = result_in.unwrap();
    assert!(queues_in.iter().any(|q| q.name == name));

    rc.delete_queue(vhost, name, false).unwrap();
}

#[test]
fn test_blocking_list_streams() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let name = "rust.tests.stream.list_streams.blocking";

    let _ = rc.delete_queue(vhost, name, true);

    let params = QueueParams::new_stream(name, None);
    rc.declare_queue(vhost, &params).unwrap();

    crate::test_helpers::await_queue_metric_emission();

    let result = rc.list_streams();
    assert!(result.is_ok(), "list_streams returned {result:?}");
    let streams = result.unwrap();
    assert!(streams.iter().any(|q| q.name == name));

    let result_in = rc.list_streams_in(vhost);
    assert!(result_in.is_ok(), "list_streams_in returned {result_in:?}");
    let streams_in = result_in.unwrap();
    assert!(streams_in.iter().any(|q| q.name == name));

    rc.delete_queue(vhost, name, false).unwrap();
}

#[test]
fn test_blocking_delete_queues_bulk() {
    let endpoint = endpoint();
    let rc = Client::new(&endpoint, USERNAME, PASSWORD);
    let vhost = "/";
    let names = [
        "rust.tests.cq.bulk.1",
        "rust.tests.cq.bulk.2",
        "rust.tests.cq.bulk.3",
    ];

    for name in &names {
        let _ = rc.delete_queue(vhost, name, true);
        let params = QueueParams::new_durable_classic_queue(name, None);
        rc.declare_queue(vhost, &params).unwrap();
    }

    let result = rc.delete_queues(vhost, &names, false);
    assert!(result.is_ok(), "delete_queues returned {result:?}");

    for name in &names {
        let info = rc.get_queue_info(vhost, name);
        assert!(info.is_err(), "Queue {} should have been deleted", name);
    }

    let result_idempotent = rc.delete_queues(vhost, &names, true);
    assert!(
        result_idempotent.is_ok(),
        "Idempotent delete_queues should succeed"
    );
}