rabbitmq_http_client 0.88.0

RabbitMQ HTTP API client
Documentation
// Copyright (C) 2023-2025 RabbitMQ Core Team (teamrabbitmq@gmail.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::{
    commons::{PaginationParams, QueueType},
    path,
    requests::{QueueParams, StreamParams},
    responses::{self, QueueOps, XArguments},
};
use serde_json::{Map, Value, json};

use super::client::{Client, Result};
use std::fmt::Display;

impl<E, U, P> Client<E, U, P>
where
    E: Display,
    U: Display,
    P: Display,
{
    /// Lists all queues and streams across the cluster.
    /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_queues(&self) -> Result<Vec<responses::QueueInfo>> {
        self.get_api_request("queues")
    }

    /// Lists queues and streams with pagination.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_queues_paged(
        &self,
        params: &PaginationParams,
    ) -> Result<Vec<responses::QueueInfo>> {
        match params.to_query_string() {
            Some(query) => self.get_paginated_api_request("queues", &query),
            None => self.list_queues(),
        }
    }

    /// Lists all queues and streams in the given virtual host.
    /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_queues_in(&self, virtual_host: &str) -> Result<Vec<responses::QueueInfo>> {
        self.get_api_request(path!("queues", virtual_host))
    }

    /// Lists queues and streams in the given virtual host with pagination.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_queues_in_paged(
        &self,
        virtual_host: &str,
        params: &PaginationParams,
    ) -> Result<Vec<responses::QueueInfo>> {
        match params.to_query_string() {
            Some(query) => self.get_paginated_api_request(path!("queues", virtual_host), &query),
            None => self.list_queues_in(virtual_host),
        }
    }

    /// Lists all queues and streams across the cluster. Compared to [`list_queues`], provides more queue metrics.
    /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more.
    ///
    /// Requires RabbitMQ 3.13.0 or a later version.
    ///
    /// Requires the `monitoring` user tag. Does not modify state.
    /// Can be used by restricted monitoring users with the `monitoring` tag and only the `read`, `configure` permissions.
    pub fn list_queues_with_details(&self) -> Result<Vec<responses::DetailedQueueInfo>> {
        self.get_api_request("queues/detailed")
    }

    /// Returns information about a queue or stream.
    /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) to learn more.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queue. Does not modify state.
    pub fn get_queue_info(&self, virtual_host: &str, name: &str) -> Result<responses::QueueInfo> {
        let response = self.http_get(path!("queues", virtual_host, name), None, None)?;
        let response = response.json()?;
        Ok(response)
    }

    /// Returns information about a stream.
    /// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queue. Does not modify state.
    pub fn get_stream_info(&self, virtual_host: &str, name: &str) -> Result<responses::QueueInfo> {
        self.get_queue_info(virtual_host, name)
    }

    /// Declares a [queue](https://www.rabbitmq.com/docs/queues).
    ///
    /// If the queue already exists with different parameters, this operation may fail
    /// unless the parameters are equivalent.
    ///
    /// Requires the `management` user tag and have `configure` permissions on the queue.
    pub fn declare_queue(&self, vhost: &str, params: &QueueParams<'_>) -> Result<()> {
        self.put_api_request(path!("queues", vhost, params.name), params)
    }

    /// Declares a [RabbitMQ stream](https://www.rabbitmq.com/docs/streams).
    ///
    /// Streams are a durable, replicated, long-lived data structure in RabbitMQ designed for
    /// high-throughput scenarios. Unlike traditional queues, consuming from a stream is
    /// a non-destructive operation. Stream data is deleted according to an effective
    /// stream retention policy.
    ///
    /// If the stream already exists with different parameters, this operation may fail
    /// unless the parameters are equivalent.
    ///
    /// Requires the `management` user tag and have `configure` permissions on the queue.
    pub fn declare_stream(&self, vhost: &str, params: &StreamParams<'_>) -> Result<()> {
        let mut m: Map<String, Value> = Map::new();

        if let Some(m2) = params.arguments.clone() {
            m.extend(m2);
        };

        if !params.expiration.is_empty() {
            m.insert(
                XArguments::X_MAX_AGE_KEY.to_owned(),
                json!(params.expiration),
            );
        }
        if let Some(val) = params.max_length_bytes {
            m.insert(XArguments::X_MAX_LENGTH_BYTES_KEY.to_owned(), json!(val));
        }
        if let Some(val) = params.max_segment_length_bytes {
            m.insert(
                XArguments::X_STREAM_MAX_SEGMENT_SIZE_BYTES_KEY.to_owned(),
                json!(val),
            );
        }

        let q_params = QueueParams::new_stream(params.name, Some(m));
        let _response =
            self.http_put(path!("queues", vhost, params.name), &q_params, None, None)?;
        Ok(())
    }

    /// Deletes a queue in a specified virtual host.
    ///
    /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent queue
    /// will fail.
    ///
    /// Requires the `management` user tag and have `configure` permissions on the queue.
    pub fn delete_queue(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
        self.delete_api_request_with_optional_not_found(path!("queues", vhost, name), idempotently)
    }

    /// Deletes multiple queues in a specified virtual host.
    ///
    /// When `idempotently` is true, non-existent queues are silently skipped.
    /// When `idempotently` is false, the operation fails on the first non-existent queue.
    ///
    /// Requires the `management` user tag and have `configure` permissions on the queues.
    pub fn delete_queues(&self, vhost: &str, names: &[&str], idempotently: bool) -> Result<()> {
        for name in names {
            self.delete_queue(vhost, name, idempotently)?;
        }
        Ok(())
    }

    /// Deletes a stream in a specified virtual host.
    ///
    /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent stream
    /// will fail.
    ///
    /// Requires the `management` user tag and have `configure` permissions on the queue.
    pub fn delete_stream(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
        self.delete_queue(vhost, name, idempotently)
    }

    /// Removes all messages in "ready for delivery" state from a queue without deleting the queue itself.
    ///
    /// Messages that were delivered but are pending acknowledgement will not be deleted
    /// by purging.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queue.
    pub fn purge_queue(&self, virtual_host: &str, name: &str) -> Result<()> {
        let _response =
            self.http_delete(path!("queues", virtual_host, name, "contents"), None, None)?;
        Ok(())
    }

    /// Convenience method: declares a durable quorum queue with no arguments.
    ///
    /// Requires the `management` user tag and have `configure` permissions on the queue.
    pub fn declare_quorum_queue(&self, vhost: &str, name: &str) -> Result<()> {
        let params = QueueParams::new_quorum_queue(name, None);
        self.declare_queue(vhost, &params)
    }

    /// Convenience method: declares a durable classic queue with no arguments.
    ///
    /// Requires the `management` user tag and have `configure` permissions on the queue.
    pub fn declare_classic_queue(&self, vhost: &str, name: &str) -> Result<()> {
        let params = QueueParams::new_durable_classic_queue(name, None);
        self.declare_queue(vhost, &params)
    }

    /// Lists only quorum queues across the cluster.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_quorum_queues(&self) -> Result<Vec<responses::QueueInfo>> {
        let all = self.list_queues()?;
        Ok(all
            .into_iter()
            .filter(|q| q.queue_type() == QueueType::Quorum)
            .collect())
    }

    /// Lists only quorum queues in the given virtual host.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_quorum_queues_in(&self, virtual_host: &str) -> Result<Vec<responses::QueueInfo>> {
        let all = self.list_queues_in(virtual_host)?;
        Ok(all
            .into_iter()
            .filter(|q| q.queue_type() == QueueType::Quorum)
            .collect())
    }

    /// Lists only classic queues across the cluster.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_classic_queues(&self) -> Result<Vec<responses::QueueInfo>> {
        let all = self.list_queues()?;
        Ok(all
            .into_iter()
            .filter(|q| q.queue_type() == QueueType::Classic)
            .collect())
    }

    /// Lists only classic queues in the given virtual host.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_classic_queues_in(&self, virtual_host: &str) -> Result<Vec<responses::QueueInfo>> {
        let all = self.list_queues_in(virtual_host)?;
        Ok(all
            .into_iter()
            .filter(|q| q.queue_type() == QueueType::Classic)
            .collect())
    }

    /// Lists only streams across the cluster.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_streams(&self) -> Result<Vec<responses::QueueInfo>> {
        let all = self.list_queues()?;
        Ok(all
            .into_iter()
            .filter(|q| q.queue_type() == QueueType::Stream)
            .collect())
    }

    /// Lists only streams in the given virtual host.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_streams_in(&self, virtual_host: &str) -> Result<Vec<responses::QueueInfo>> {
        let all = self.list_queues_in(virtual_host)?;
        Ok(all
            .into_iter()
            .filter(|q| q.queue_type() == QueueType::Stream)
            .collect())
    }

    /// Lists only streams with pagination.
    ///
    /// Note: Pagination is applied server-side to all queues, then streams are
    /// filtered client-side. This means the number of results may be less than
    /// the page size even on non-final pages.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_streams_paged(
        &self,
        params: &PaginationParams,
    ) -> Result<Vec<responses::QueueInfo>> {
        let all = self.list_queues_paged(params)?;
        Ok(all
            .into_iter()
            .filter(|q| q.queue_type() == QueueType::Stream)
            .collect())
    }

    /// Lists only streams in the given virtual host with pagination.
    ///
    /// Note: Pagination is applied server-side to all queues, then streams are
    /// filtered client-side. This means the number of results may be less than
    /// the page size even on non-final pages.
    ///
    /// Requires the `management` user tag and have `read` permissions on the queues. Does not modify state.
    pub fn list_streams_in_paged(
        &self,
        virtual_host: &str,
        params: &PaginationParams,
    ) -> Result<Vec<responses::QueueInfo>> {
        let all = self.list_queues_in_paged(virtual_host, params)?;
        Ok(all
            .into_iter()
            .filter(|q| q.queue_type() == QueueType::Stream)
            .collect())
    }
}