rabbitmq_management_client/api/
queue.rs1use crate::api::_generic::{handle_empty_response, handle_response};
2use crate::api::binding::RabbitMqBinding;
3use crate::api::options::pagination::RabbitMqPaginationRequest;
4use crate::api::RabbitMqPaginatedResponse;
5use crate::errors::RabbitMqClientError;
6use crate::RabbitMqClient;
7use async_trait::async_trait;
8use rust_decimal::Decimal;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12use super::options::RabbitMqRequestOptions;
13
14#[async_trait]
15pub trait QueueApi {
16 async fn list_queues(
17 &self,
18 vhost: Option<String>,
19 options: Option<RabbitMqRequestOptions>,
20 ) -> Result<RabbitMqPaginatedResponse<RabbitMqQueue>, RabbitMqClientError>;
21
22 async fn get_queue(
23 &self,
24 vhost: String,
25 name: String,
26 ) -> Result<RabbitMqQueue, RabbitMqClientError>;
27
28 async fn get_queue_bindings(
29 &self,
30 vhost: String,
31 name: String,
32 ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError>;
33
34 async fn create_queue(
35 &self,
36 vhost: String,
37 queue: String,
38 request: RabbitMqQueueRequest,
39 ) -> Result<(), RabbitMqClientError>;
40
41 async fn update_queue(
42 &self,
43 vhost: String,
44 queue: String,
45 request: RabbitMqQueueRequest,
46 ) -> Result<(), RabbitMqClientError>;
47
48 async fn delete_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError>;
49
50 async fn purge_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError>;
51
52 async fn set_queue_actions(
53 &self,
54 vhost: String,
55 queue: String,
56 action: RabbitMqQueueAction,
57 ) -> Result<(), RabbitMqClientError>;
58}
59
60#[async_trait]
61impl QueueApi for RabbitMqClient {
62 #[tracing::instrument(skip(self))]
63 async fn list_queues(
64 &self,
65 vhost: Option<String>,
66 options: Option<RabbitMqRequestOptions>,
67 ) -> Result<RabbitMqPaginatedResponse<RabbitMqQueue>, RabbitMqClientError> {
68 let options: RabbitMqRequestOptions = options.unwrap_or_default();
69 let pagination: RabbitMqPaginationRequest = options.pagination.unwrap_or_default().into();
70
71 let response = self
72 .client
73 .request(
74 reqwest::Method::GET,
75 format!("{}/api/queues/{}", self.api_url, vhost.unwrap_or_default()),
76 )
77 .query(&pagination)
78 .query(&options.sorting)
79 .query(&[("disable_stats", options.disable_stats)])
80 .send()
81 .await?;
82
83 handle_response(response).await
84 }
85
86 #[tracing::instrument(skip(self))]
87 async fn get_queue(
88 &self,
89 vhost: String,
90 name: String,
91 ) -> Result<RabbitMqQueue, RabbitMqClientError> {
92 let response = self
93 .client
94 .request(
95 reqwest::Method::GET,
96 format!("{}/api/queues/{}/{}", self.api_url, vhost, name),
97 )
98 .send()
99 .await?;
100
101 handle_response(response).await
102 }
103
104 #[tracing::instrument(skip(self))]
105 async fn get_queue_bindings(
106 &self,
107 vhost: String,
108 name: String,
109 ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError> {
110 let response = self
111 .client
112 .request(
113 reqwest::Method::GET,
114 format!("{}/api/queues/{}/{}/bindings", self.api_url, vhost, name),
115 )
116 .send()
117 .await?;
118
119 handle_response(response).await
120 }
121
122 #[tracing::instrument(skip(self))]
123 async fn create_queue(
124 &self,
125 vhost: String,
126 queue: String,
127 request: RabbitMqQueueRequest,
128 ) -> Result<(), RabbitMqClientError> {
129 match self.get_queue(vhost.clone(), queue.clone()).await {
130 Ok(_) => Err(RabbitMqClientError::AlreadyExists(format!(
131 "{} queue",
132 queue
133 ))),
134 Err(e) => match e {
135 RabbitMqClientError::NotFound(_) => self.update_queue(vhost, queue, request).await,
136 _ => Err(e),
137 },
138 }
139 }
140
141 #[tracing::instrument(skip(self))]
142 async fn update_queue(
143 &self,
144 vhost: String,
145 queue: String,
146 request: RabbitMqQueueRequest,
147 ) -> Result<(), RabbitMqClientError> {
148 let response = self
149 .client
150 .request(
151 reqwest::Method::PUT,
152 format!("{}/api/queues/{}/{}", self.api_url, vhost, queue),
153 )
154 .json(&request)
155 .send()
156 .await?;
157
158 handle_empty_response(response).await
159 }
160
161 #[tracing::instrument(skip(self))]
162 async fn delete_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError> {
163 let response = self
164 .client
165 .request(
166 reqwest::Method::DELETE,
167 format!("{}/api/queues/{}/{}", self.api_url, vhost, name),
168 )
169 .send()
170 .await?;
171
172 handle_empty_response(response).await
173 }
174
175 #[tracing::instrument(skip(self))]
176 async fn purge_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError> {
177 let response = self
178 .client
179 .request(
180 reqwest::Method::DELETE,
181 format!("{}/api/queues/{}/{}/contents", self.api_url, vhost, name),
182 )
183 .send()
184 .await?;
185
186 handle_empty_response(response).await
187 }
188
189 #[tracing::instrument(skip(self))]
190 async fn set_queue_actions(
191 &self,
192 vhost: String,
193 queue: String,
194 action: RabbitMqQueueAction,
195 ) -> Result<(), RabbitMqClientError> {
196 let response = self
197 .client
198 .request(
199 reqwest::Method::POST,
200 format!("{}/api/queues/{}/{}/actions", self.api_url, vhost, queue),
201 )
202 .json(&RabbitMqQueueActionRequest { action })
203 .send()
204 .await?;
205
206 handle_empty_response(response).await
207 }
208}
209
210#[derive(Debug, Deserialize)]
211pub struct RabbitMqQueue {
212 pub name: String,
213 pub node: String,
214 pub arguments: HashMap<String, RabbitMqArgument>,
215 pub state: String,
216 #[serde(rename = "type")]
217 pub kind: String,
218 pub vhost: String,
219 pub auto_delete: bool,
220 pub durable: bool,
221 pub exclusive: bool,
222 pub consumer_capacity: Option<Decimal>,
223 pub consumer_utilisation: Option<Decimal>,
224 pub consumers: Option<i64>,
225 pub messages: Option<i64>,
226 pub messages_ready: Option<i64>,
227 pub messages_unacknowledged: Option<i64>,
228 pub garbage_collection: Option<RabbitMqQueueGarbageCollection>,
229 pub message_stats: Option<RabbitMqQueueMessageStats>,
230}
231
232#[derive(Debug, Deserialize)]
233#[serde(untagged)]
234pub enum RabbitMqArgument {
235 String(String),
236 Decimal(Decimal),
237}
238
239#[derive(Debug, Deserialize)]
240pub struct RabbitMqQueueMessageStats {
241 #[serde(default)]
242 pub ack: i64,
243 #[serde(default)]
244 pub deliver: i64,
245 #[serde(default)]
246 pub deliver_get: i64,
247 #[serde(default)]
248 pub deliver_no_ack: i64,
249 #[serde(default)]
250 pub get: i64,
251 #[serde(default)]
252 pub get_empty: i64,
253 #[serde(default)]
254 pub get_no_ack: i64,
255 #[serde(default)]
256 pub publish: i64,
257 #[serde(default)]
258 pub redeliver: i64,
259}
260
261#[derive(Debug, Deserialize)]
262pub struct RabbitMqQueueGarbageCollection {
263 pub fullsweep_after: i64,
264 pub max_heap_size: i64,
265 pub min_bin_vheap_size: i64,
266 pub min_heap_size: i64,
267 pub minor_gcs: i64,
268}
269
270#[derive(Debug, Serialize)]
271pub struct RabbitMqQueueRequest {
272 pub auto_delete: bool,
273 pub durable: bool,
274 #[serde(skip_serializing_if = "Option::is_none")]
275 pub arguments: Option<HashMap<String, String>>,
276 #[serde(skip_serializing_if = "Option::is_none")]
277 pub node: Option<String>,
278}
279
280#[derive(Debug, Serialize)]
281pub struct RabbitMqQueueActionRequest {
282 pub action: RabbitMqQueueAction,
283}
284
285#[derive(Debug, Serialize)]
286#[serde(rename_all = "snake_case")]
287pub enum RabbitMqQueueAction {
288 Sync,
289 CancelSync,
290}