A Rust Client for the RabbitMQ HTTP API
This is a Rust client for the RabbitMQ HTTP API.
This is not an AMQP 0-9-1 client (see amqprs), an AMQP 1.0 client (see fe2o3-amqp), or a RabbitMQ Stream protocol client (see rabbitmq-stream-rust-client).
Project Maturity
This library is reasonably mature.
Before 1.0.0, breaking API changes can and will be introduced.
Supported RabbitMQ Series
This library targets RabbitMQ 4.x and 3.13.x.
All older series have reached End of Life.
Dependency
Async Client
= { = "0.83.0", = ["core", "async"] }
Blocking Client
= { = "0.83.0", = ["core", "blocking"] }
With Tabled Support
= { = "0.83.0", = ["core", "async", "tabled"] }
= { = "0.83.0", = ["core", "blocking", "tabled"] }
With Zeroize Support
The zeroize feature provides a Password type that offers zeroisation
of credentials in memory for security-demanding use cases:
= { = "0.83.0", = ["core", "async", "zeroize"] }
TLS Backend
By default, this client uses native-tls.
To use rustls instead:
= { = "0.83.0", = false, = ["core", "async", "rustls"] }
Add hickory-dns to use the Hickory DNS resolver.
Async Client
The async client is in rabbitmq_http_client::api.
Instantiate a Client
use Client;
let endpoint = "http://localhost:15672/api";
let rc = new;
Using ClientBuilder
ClientBuilder#build validates the endpoint scheme (http or https)
and returns a Result:
use ClientBuilder;
let rc = new
.with_endpoint
.with_basic_auth_credentials
.build?;
ClientBuilder: Production Defaults
with_recommended_defaults sets a 60-second request timeout,
3 retry attempts with a 1-second delay, and disables redirects:
use ClientBuilder;
let rc = new
.with_endpoint
.with_basic_auth_credentials
.with_recommended_defaults
.build?;
ClientBuilder: Timeouts, Retries, and Redirects
These can be configured individually:
use Duration;
use ;
let rc = new
.with_endpoint
.with_basic_auth_credentials
// TCP connection timeout (connection establishment + TLS handshake)
.with_connect_timeout
// overall request/response cycle timeout
.with_request_timeout
// retry up to 3 times with a 500 ms fixed delay
.with_retry_settings
// the RabbitMQ HTTP API does not use redirects;
// disabling them is an SSRF hardening measure
.with_redirect_policy
.build?;
These settings have no effect when a custom HTTP client is passed
via ClientBuilder#with_client; configure the custom client directly.
Reachability Probe
probe_reachability checks whether the node is reachable and authentication, authorization
steps succeed.
It returns ReachabilityProbeOutcome, not Result, because both outcomes are expected:
use Client;
use ReachabilityProbeOutcome;
let rc = new;
match rc.probe_reachability.await
List Cluster Nodes
let nodes = rc.list_nodes.await?;
Get Cluster Name
let name = rc.get_cluster_name.await?;
Cluster Tags
Cluster tags are arbitrary key-value pairs for the cluster:
use ;
let tags = rc.get_cluster_tags.await?;
let mut new_tags = new;
new_tags.insert;
rc.set_cluster_tags.await?;
rc.clear_cluster_tags.await?;
Node Memory Footprint
let footprint = rc.get_node_memory_footprint.await?;
Returns a per-category memory footprint breakdown, in bytes and as percentages.
Virtual Host Operations
Virtual hosts group and isolate resources.
let vhosts = rc.list_vhosts.await?;
let vhost = rc.get_vhost.await?;
Create Virtual Host
use ;
let params = VirtualHostParams ;
rc.create_vhost.await?;
Delete Virtual Host
rc.delete_vhost.await?;
User Operations
let users = rc.list_users.await?;
let user = rc.current_user.await?;
Create User
Uses password hashing with salted SHA-256:
use ;
let salt = salt;
let password_hash = base64_encoded_salted_password_hash_sha256;
let params = UserParams ;
rc.create_user.await?;
SHA-512 is also supported:
use ;
let salt = salt;
let hash = base64_encoded_salted_password_hash_sha512;
// or via the algorithm enum
let hash = SHA512.salt_and_hash?;
Delete User
rc.delete_user.await?;
Connection Operations
let connections = rc.list_connections.await?;
Close Connection
rc.close_connection.await?;
Queue Operations
let queues = rc.list_queues.await?;
let queues_in_vhost = rc.list_queues_in.await?;
let info = rc.get_queue_info.await?;
Queues can also be listed by type:
let quorum_queues = rc.list_quorum_queues.await?;
let classic_queues = rc.list_classic_queues_in.await?;
let streams = rc.list_streams.await?;
Declare a Classic Queue
use QueueParams;
let params = new_durable_classic_queue;
rc.declare_queue.await?;
Declare a Quorum Queue
Quorum queues are replicated, data safety-oriented queues based on the Raft consensus algorithm.
use QueueParams;
use ;
let mut args = new;
args.insert;
let params = new_quorum_queue;
rc.declare_queue.await?;
Type-Safe Queue Arguments with XArgumentsBuilder
XArgumentsBuilder is a type-safe alternative to raw Map<String, Value> for
optional queue arguments:
use ;
let args = new
.max_length
.dead_letter_exchange
.dead_letter_strategy
.delivery_limit
.single_active_consumer
.build;
let params = new_quorum_queue;
rc.declare_queue.await?;
Declare a Stream
Streams are persistent, replicated append-only logs with non-destructive consumer semantics.
Using StreamParams:
use StreamParams;
let params = with_expiration_and_length_limit;
rc.declare_stream.await?;
Or using QueueParams:
use QueueParams;
use ;
let mut args = new;
args.insert;
let params = new_stream;
rc.declare_queue.await?;
Purge Queue
rc.purge_queue.await?;
Delete Queue
rc.delete_queue.await?;
Batch Queue Deletion
rc.delete_queues.await?;
Pagination
Some list operations support pagination:
use PaginationParams;
let page = first_page;
let queues = rc.list_queues_paged.await?;
if let Some = page.next_page
Paginated variants: list_queues_paged, list_queues_in_paged, list_connections_paged.
Exchange Operations
let exchanges = rc.list_exchanges.await?;
let exchanges_in_vhost = rc.list_exchanges_in.await?;
Declare an Exchange
use ;
let params = ExchangeParams ;
rc.declare_exchange.await?;
Delete Exchange
rc.delete_exchange.await?;
Binding Operations
Bindings connect exchanges to queues or other exchanges.
let bindings = rc.list_bindings.await?;
let bindings_in_vhost = rc.list_bindings_in.await?;
let queue_bindings = rc.list_queue_bindings.await?;
Bind Queue to Exchange
rc.bind_queue.await?;
Bind Exchange to Exchange
rc.bind_exchange.await?;
Delete Binding
use ;
let params = BindingDeletionParams ;
rc.delete_binding.await?;
Permission Operations
use Permissions;
let params = Permissions ;
rc.declare_permissions.await?;
Policy Operations
Policies dynamically configure queue and exchange properties using pattern matching.
let policies = rc.list_policies.await?;
let policies_in_vhost = rc.list_policies_in.await?;
Declare a Policy
use ;
use ;
let mut definition = new;
definition.insert;
let params = PolicyParams ;
rc.declare_policy.await?;
Delete Policy
rc.delete_policy.await?;
Shovel Operations
Dynamic shovels move messages between queues, potentially across clusters.
use MessageTransferAcknowledgementMode;
use ;
let params = Amqp091ShovelParams ;
rc.declare_amqp091_shovel.await?;
let shovels = rc.list_shovels.await?;
let shovels_in_vhost = rc.list_shovels_in.await?;
rc.delete_shovel.await?;
AMQP 1.0 shovels use declare_amqp10_shovel.
Federation Operations
Federation replicates exchanges and queues across clusters.
use FederationUpstreamParams;
let params = FederationUpstreamParams ;
rc.declare_federation_upstream.await?;
let upstreams = rc.list_federation_upstreams.await?;
let links = rc.list_federation_links.await?;
rc.delete_federation_upstream.await?;
Runtime Parameters
Runtime parameters store per-vhost plugin settings such as federation upstream and shovel configurations.
use RuntimeParameterDefinition;
use ;
// set a max-connections limit on a virtual host
let mut value = new;
value.insert;
let param = RuntimeParameterDefinition ;
rc.upsert_runtime_parameter.await?;
let params = rc.list_runtime_parameters.await?;
rc.clear_runtime_parameter.await?;
Global Runtime Parameters
Global runtime parameters are cluster-wide, not scoped to a virtual host.
use GlobalRuntimeParameterDefinition;
use ;
// tag the cluster with metadata
let mut tags = new;
tags.insert;
tags.insert;
let mut value = new;
value.insert;
let param = GlobalRuntimeParameterDefinition ;
rc.upsert_global_runtime_parameter.await?;
let params = rc.list_global_runtime_parameters.await?;
rc.clear_global_runtime_parameter.await?;
Definition Operations
Definitions contain schema, topology, and user metadata for export and import.
let definitions = rc.export_cluster_wide_definitions.await?;
let vhost_definitions = rc.export_vhost_definitions.await?;
Import Definitions
let defs: Value = from_str?;
rc.import_definitions.await?;
Definition Transformers
Exported definitions can be transformed before import, e.g. to migrate from classic mirrored queues to quorum queues:
use ;
let mut defs = rc.export_cluster_wide_definitions_as_data.await?;
let chain = new;
chain.apply;
Available transformers:
| Transformer | Description |
|---|---|
PrepareForQuorumQueueMigration |
Strips classic mirrored queue policy keys and incompatible x-arguments |
StripCmqKeysFromPolicies |
Removes only the classic mirrored queue-related keys from policies |
DropEmptyPolicies |
Removes policies with empty definitions (use after stripping CMQ keys) |
ExcludeUsers |
Removes all users from the definition set |
ExcludePermissions |
Removes all permissions from the definition set |
ExcludeRuntimeParameters |
Removes all runtime parameters from the definition set |
ExcludePolicies |
Removes all policies from the definition set |
ObfuscateUsernames |
Replaces usernames and passwords with dummy values |
Virtual host-level equivalents (PrepareForQuorumQueueMigrationVhost, StripCmqKeysFromVhostPolicies,
DropEmptyVhostPolicies) use VirtualHostTransformationChain.
Health Checks
let alarms = rc.health_check_cluster_wide_alarms.await?;
let quorum_critical = rc.health_check_if_node_is_quorum_critical.await?;
let port_check = rc.health_check_port_listener.await?;
let protocol_check = rc.health_check_protocol_listener.await?;
Feature Flags
Feature flags gate new functionality that requires cluster-wide coordination.
let flags = rc.list_feature_flags.await?;
rc.enable_feature_flag.await?;
rc.enable_all_stable_feature_flags.await?;
Deprecated Features
let all = rc.list_all_deprecated_features.await?;
let in_use = rc.list_deprecated_features_in_use.await?;
Rebalance Queue Leaders
Redistributes quorum queue and stream leaders across cluster nodes.
rc.rebalance_queue_leaders.await?;
Idempotent Deletes
All delete_* and clear_* functions accept an idempotently: bool argument.
When true, 404 Not Found responses are silently ignored:
// will not fail if the queue does not exist
rc.delete_queue.await?;
Error Classification
HttpClientError has methods for classifying errors:
match rc.delete_queue.await
Predicates: is_not_found, is_already_exists, is_unauthorized, is_forbidden,
is_client_error, is_server_error, is_connection_error, is_timeout, is_tls_handshake_error.
For further detail: status_code, url, error_details.
URI Builder for Federation and Shovel URIs
UriBuilder builds AMQP URIs with TLS parameters
for federation and shovel connections:
use ;
use TlsPeerVerificationMode;
let uri = new
.unwrap
.with_tls_peer_verification
.with_ca_cert_file
.build
.unwrap;
// group TLS settings for reuse across multiple URIs
let tls = with_verification
.ca_cert_file
.client_cert_file
.client_key_file;
let uri = new
.unwrap
.replace
.build
.unwrap;
Tanzu RabbitMQ: Schema Definition Sync and Warm Standby Replication
Tanzu RabbitMQ Schema Definition Sync (SDS) and Warm Standby Replication (WSR):
rc.enable_schema_definition_sync_on_node.await?;
rc.disable_schema_definition_sync_on_node.await?;
Blocking Client
The blocking client is in rabbitmq_http_client::blocking_api. It has the same API as the async client but without async/await.
Instantiate a Client
use Client;
let endpoint = "http://localhost:15672/api";
let rc = new;
Using ClientBuilder
use ClientBuilder;
let rc = new
.with_endpoint
.with_basic_auth_credentials
.build?;
List Cluster Nodes
let nodes = rc.list_nodes?;
Virtual Host Operations
let vhosts = rc.list_vhosts?;
Create Virtual Host
use ;
let params = VirtualHostParams ;
rc.create_vhost?;
Delete Virtual Host
rc.delete_vhost?;
User Operations
let users = rc.list_users?;
Create User
use ;
let salt = salt;
let password_hash = base64_encoded_salted_password_hash_sha256;
let params = UserParams ;
rc.create_user?;
Connection Operations
let connections = rc.list_connections?;
Queue Operations
let queues = rc.list_queues?;
let queues_in_vhost = rc.list_queues_in?;
Declare a Queue
use QueueParams;
let params = new_durable_classic_queue;
rc.declare_queue?;
Declare a Quorum Queue
use QueueParams;
use ;
let mut args = new;
args.insert;
let params = new_quorum_queue;
rc.declare_queue?;
Purge Queue
rc.purge_queue?;
Delete Queue
rc.delete_queue?;
Exchange Operations
let exchanges = rc.list_exchanges?;
Declare an Exchange
use ;
let params = ExchangeParams ;
rc.declare_exchange?;
Delete Exchange
rc.delete_exchange?;
Binding Operations
let bindings = rc.list_bindings?;
let queue_bindings = rc.list_queue_bindings?;
Bind Queue to Exchange
rc.bind_queue?;
Permission Operations
use Permissions;
let params = Permissions ;
rc.declare_permissions?;
Policy Operations
use ;
use ;
let mut definition = new;
definition.insert;
let params = PolicyParams ;
rc.declare_policy?;
Definition Operations
let definitions = rc.export_cluster_wide_definitions?;
Health Checks
let alarms = rc.health_check_cluster_wide_alarms?;
TLS Support
This client makes few assumptions about TLS configuration.
Use the reqwest::ClientBuilder to configure TLS, then pass the client to this library's ClientBuilder::with_client:
use Client as HTTPClient;
use ClientBuilder;
// Configure TLS using reqwest's ClientBuilder
let mut b = builder
.user_agent
.min_tls_version
.danger_accept_invalid_certs
.danger_accept_invalid_hostnames;
// Add a CA certificate bundle file to the list of trusted roots
// for x.509 peer verification
b = b.add_root_certificate;
let httpc = b.build?;
// Use the TLS port (15671) instead of the default HTTP port (15672)
let endpoint = "https://rabbitmq.example.com:15671/api";
// Pass the pre-configured HTTP client to this library's ClientBuilder
let client = new
.with_endpoint
.with_basic_auth_credentials
.with_client
.build?;
Choose the certificate store for x.509 peer verification and the minimum TLS version.
Defaults
By default, this client uses native-tls.
Trusted CA certificates are managed via OS-specific mechanisms (Keychain on macOS, openssl directories on Linux).
Combined Examples
1. Setting Up an Application Environment
Create an isolated vhost with a dedicated user, topology, and permissions:
use ;
async
2. Blue-Green Deployment Migration
Set up queue federation for migrating from an old cluster to a new one. This RabbitMQ blog post covers the approach in detail.
use ;
use ;
// green: the new cluster
// blue_uri: e.g., amqp://user:pass@blue-cluster:5672/vhost
async
async
3. Health Monitoring
use Client;
async
4. Backup and Restore Definitions
use Client;
async
5. Event Topology with Dead-Lettering
Set up a fan-out pattern with dead-lettering for failed messages:
use ;
use ;
async
6. Pre-Upgrade Health Verification
Before a rolling upgrade:
use Client;
async
7. Connection and Channel Audit
use Client;
async
8. Multi-Tenant Setup
Provision isolated environments for multiple tenants:
use ;
async
9. Teardown Test Environment
Clean up after integration tests:
use Client;
async
License
This crate, rabbitmq-http-api-client-rs, is dual-licensed under the Apache Software License 2.0 and the MIT license.