1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright (C) 2023-2025 RabbitMQ Core Team (teamrabbitmq@gmail.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{commons::PaginationParams, path, responses};
use reqwest::{
StatusCode,
header::{HeaderMap, HeaderValue},
};
use super::client::{Client, Result};
use std::fmt::Display;
impl<E, U, P> Client<E, U, P>
where
E: Display,
U: Display,
P: Display,
{
/// Lists all AMQP 1.0 and 0-9-1 client connections across the cluster.
/// See [Connections Guide](https://www.rabbitmq.com/docs/connections) to learn more.
///
/// Requires the `management` user tag. Does not modify state.
pub fn list_connections(&self) -> Result<Vec<responses::Connection>> {
self.get_api_request("connections")
}
/// Lists connections with pagination.
///
/// Requires the `management` user tag. Does not modify state.
pub fn list_connections_paged(
&self,
params: &PaginationParams,
) -> Result<Vec<responses::Connection>> {
match params.to_query_string() {
Some(query) => self.get_paginated_api_request("connections", &query),
None => self.list_connections(),
}
}
/// Returns information about a connection.
///
/// Connection name is usually obtained from `crate::responses::Connection` or `crate::responses::UserConnection`,
/// e.g. via `Client#list_connections`, `Client#list_connections_in`, `Client#list_user_connections`.
///
/// Requires the `management` user tag. Does not modify state.
pub fn get_connection_info(&self, name: &str) -> Result<responses::Connection> {
self.get_api_request(path!("connections", name))
}
/// Returns information about a stream connection.
///
/// Connection name is usually obtained from `crate::responses::Connection` or `crate::responses::UserConnection`,
/// e.g. via `Client#list_stream_connections`, `Client#list_stream_connections_in`, `Client#list_user_connections`.
///
/// Requires the `management` user tag. Does not modify state.
pub fn get_stream_connection_info(
&self,
virtual_host: &str,
name: &str,
) -> Result<responses::Connection> {
self.get_api_request(path!("stream", "connections", virtual_host, name))
}
/// Closes a connection with an optional reason.
///
/// The reason will be passed on in the connection error to the client and will be logged on the RabbitMQ end.
///
/// Requires the `management` user tag.
pub fn close_connection(
&self,
name: &str,
reason: Option<&str>,
idempotently: bool,
) -> Result<()> {
let excludes = if idempotently {
Some(StatusCode::NOT_FOUND)
} else {
None
};
let mut headers = HeaderMap::new();
if let Some(value) = reason {
let hdr = HeaderValue::from_str(value)?;
headers.insert("X-Reason", hdr);
}
self.http_delete_with_headers(path!("connections", name), headers, excludes, None)?;
Ok(())
}
/// Closes all connections for a user with an optional reason.
///
/// The reason will be passed on in the connection error to the client and will be logged on the RabbitMQ end.
///
/// This is en equivalent of listing all connections of a user with `Client#list_user_connections` and then
/// closing them one by one.
///
/// Requires the `administrator` user tag.
pub fn close_user_connections(
&self,
username: &str,
reason: Option<&str>,
idempotently: bool,
) -> Result<()> {
let excludes = if idempotently {
Some(StatusCode::NOT_FOUND)
} else {
None
};
let mut headers = HeaderMap::new();
if let Some(value) = reason {
let hdr = HeaderValue::from_str(value)?;
headers.insert("X-Reason", hdr);
}
self.http_delete_with_headers(
path!("connections", "username", username),
headers,
excludes,
None,
)?;
Ok(())
}
/// Lists all connections in the given virtual host.
/// See [Connections Guide](https://www.rabbitmq.com/docs/connections) to learn more.
///
/// Requires the `management` user tag. Does not modify state.
pub fn list_connections_in(&self, virtual_host: &str) -> Result<Vec<responses::Connection>> {
self.get_api_request(path!("vhosts", virtual_host, "connections"))
}
/// Lists all connections of a specific user.
/// See [Connection Guide](https://www.rabbitmq.com/docs/connections) to learn more.
///
/// Requires the `management` user tag. Does not modify state.
pub fn list_user_connections(&self, username: &str) -> Result<Vec<responses::UserConnection>> {
self.get_api_request(path!("connections", "username", username))
}
/// Lists all RabbitMQ Stream Protocol client connections across the cluster.
/// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more.
///
/// Requires the `management` user tag. Does not modify state.
pub fn list_stream_connections(&self) -> Result<Vec<responses::Connection>> {
self.get_api_request("stream/connections")
}
/// Lists RabbitMQ Stream Protocol client connections in the given virtual host.
/// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more.
///
/// Requires the `management` user tag. Does not modify state.
pub fn list_stream_connections_in(
&self,
virtual_host: &str,
) -> Result<Vec<responses::Connection>> {
self.get_api_request(path!("stream", "connections", virtual_host))
}
}