rabbitmq_management_client/api/
queue.rs

1use crate::api::_generic::{handle_empty_response, handle_response};
2use crate::api::binding::RabbitMqBinding;
3use crate::api::options::pagination::RabbitMqPaginationRequest;
4use crate::api::RabbitMqPaginatedResponse;
5use crate::errors::RabbitMqClientError;
6use crate::RabbitMqClient;
7use async_trait::async_trait;
8use rust_decimal::Decimal;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12use super::options::RabbitMqRequestOptions;
13
14#[async_trait]
15pub trait QueueApi {
16    async fn list_queues(
17        &self,
18        vhost: Option<String>,
19        options: Option<RabbitMqRequestOptions>,
20    ) -> Result<RabbitMqPaginatedResponse<RabbitMqQueue>, RabbitMqClientError>;
21
22    async fn get_queue(
23        &self,
24        vhost: String,
25        name: String,
26    ) -> Result<RabbitMqQueue, RabbitMqClientError>;
27
28    async fn get_queue_bindings(
29        &self,
30        vhost: String,
31        name: String,
32    ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError>;
33
34    async fn create_queue(
35        &self,
36        vhost: String,
37        queue: String,
38        request: RabbitMqQueueRequest,
39    ) -> Result<(), RabbitMqClientError>;
40
41    async fn update_queue(
42        &self,
43        vhost: String,
44        queue: String,
45        request: RabbitMqQueueRequest,
46    ) -> Result<(), RabbitMqClientError>;
47
48    async fn delete_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError>;
49
50    async fn purge_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError>;
51
52    async fn set_queue_actions(
53        &self,
54        vhost: String,
55        queue: String,
56        action: RabbitMqQueueAction,
57    ) -> Result<(), RabbitMqClientError>;
58}
59
60#[async_trait]
61impl QueueApi for RabbitMqClient {
62    #[tracing::instrument(skip(self))]
63    async fn list_queues(
64        &self,
65        vhost: Option<String>,
66        options: Option<RabbitMqRequestOptions>,
67    ) -> Result<RabbitMqPaginatedResponse<RabbitMqQueue>, RabbitMqClientError> {
68        let options: RabbitMqRequestOptions = options.unwrap_or_default();
69        let pagination: RabbitMqPaginationRequest = options.pagination.unwrap_or_default().into();
70
71        let response = self
72            .client
73            .request(
74                reqwest::Method::GET,
75                format!("{}/api/queues/{}", self.api_url, vhost.unwrap_or_default()),
76            )
77            .query(&pagination)
78            .query(&options.sorting)
79            .query(&[("disable_stats", options.disable_stats)])
80            .send()
81            .await?;
82
83        handle_response(response).await
84    }
85
86    #[tracing::instrument(skip(self))]
87    async fn get_queue(
88        &self,
89        vhost: String,
90        name: String,
91    ) -> Result<RabbitMqQueue, RabbitMqClientError> {
92        let response = self
93            .client
94            .request(
95                reqwest::Method::GET,
96                format!("{}/api/queues/{}/{}", self.api_url, vhost, name),
97            )
98            .send()
99            .await?;
100
101        handle_response(response).await
102    }
103
104    #[tracing::instrument(skip(self))]
105    async fn get_queue_bindings(
106        &self,
107        vhost: String,
108        name: String,
109    ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError> {
110        let response = self
111            .client
112            .request(
113                reqwest::Method::GET,
114                format!("{}/api/queues/{}/{}/bindings", self.api_url, vhost, name),
115            )
116            .send()
117            .await?;
118
119        handle_response(response).await
120    }
121
122    #[tracing::instrument(skip(self))]
123    async fn create_queue(
124        &self,
125        vhost: String,
126        queue: String,
127        request: RabbitMqQueueRequest,
128    ) -> Result<(), RabbitMqClientError> {
129        match self.get_queue(vhost.clone(), queue.clone()).await {
130            Ok(_) => Err(RabbitMqClientError::AlreadyExists(format!(
131                "{} queue",
132                queue
133            ))),
134            Err(e) => match e {
135                RabbitMqClientError::NotFound(_) => self.update_queue(vhost, queue, request).await,
136                _ => Err(e),
137            },
138        }
139    }
140
141    #[tracing::instrument(skip(self))]
142    async fn update_queue(
143        &self,
144        vhost: String,
145        queue: String,
146        request: RabbitMqQueueRequest,
147    ) -> Result<(), RabbitMqClientError> {
148        let response = self
149            .client
150            .request(
151                reqwest::Method::PUT,
152                format!("{}/api/queues/{}/{}", self.api_url, vhost, queue),
153            )
154            .json(&request)
155            .send()
156            .await?;
157
158        handle_empty_response(response).await
159    }
160
161    #[tracing::instrument(skip(self))]
162    async fn delete_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError> {
163        let response = self
164            .client
165            .request(
166                reqwest::Method::DELETE,
167                format!("{}/api/queues/{}/{}", self.api_url, vhost, name),
168            )
169            .send()
170            .await?;
171
172        handle_empty_response(response).await
173    }
174
175    #[tracing::instrument(skip(self))]
176    async fn purge_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError> {
177        let response = self
178            .client
179            .request(
180                reqwest::Method::DELETE,
181                format!("{}/api/queues/{}/{}/contents", self.api_url, vhost, name),
182            )
183            .send()
184            .await?;
185
186        handle_empty_response(response).await
187    }
188
189    #[tracing::instrument(skip(self))]
190    async fn set_queue_actions(
191        &self,
192        vhost: String,
193        queue: String,
194        action: RabbitMqQueueAction,
195    ) -> Result<(), RabbitMqClientError> {
196        let response = self
197            .client
198            .request(
199                reqwest::Method::POST,
200                format!("{}/api/queues/{}/{}/actions", self.api_url, vhost, queue),
201            )
202            .json(&RabbitMqQueueActionRequest { action })
203            .send()
204            .await?;
205
206        handle_empty_response(response).await
207    }
208}
209
210#[derive(Debug, Deserialize)]
211pub struct RabbitMqQueue {
212    pub name: String,
213    pub node: String,
214    pub arguments: HashMap<String, RabbitMqArgument>,
215    pub state: String,
216    #[serde(rename = "type")]
217    pub kind: String,
218    pub vhost: String,
219    pub auto_delete: bool,
220    pub durable: bool,
221    pub exclusive: bool,
222    pub consumer_capacity: Option<Decimal>,
223    pub consumer_utilisation: Option<Decimal>,
224    pub consumers: Option<i64>,
225    pub messages: Option<i64>,
226    pub messages_ready: Option<i64>,
227    pub messages_unacknowledged: Option<i64>,
228    pub garbage_collection: Option<RabbitMqQueueGarbageCollection>,
229    pub message_stats: Option<RabbitMqQueueMessageStats>,
230}
231
232#[derive(Debug, Deserialize)]
233#[serde(untagged)]
234pub enum RabbitMqArgument {
235    String(String),
236    Decimal(Decimal),
237}
238
239#[derive(Debug, Deserialize)]
240pub struct RabbitMqQueueMessageStats {
241    #[serde(default)]
242    pub ack: i64,
243    #[serde(default)]
244    pub deliver: i64,
245    #[serde(default)]
246    pub deliver_get: i64,
247    #[serde(default)]
248    pub deliver_no_ack: i64,
249    #[serde(default)]
250    pub get: i64,
251    #[serde(default)]
252    pub get_empty: i64,
253    #[serde(default)]
254    pub get_no_ack: i64,
255    #[serde(default)]
256    pub publish: i64,
257    #[serde(default)]
258    pub redeliver: i64,
259}
260
261#[derive(Debug, Deserialize)]
262pub struct RabbitMqQueueGarbageCollection {
263    pub fullsweep_after: i64,
264    pub max_heap_size: i64,
265    pub min_bin_vheap_size: i64,
266    pub min_heap_size: i64,
267    pub minor_gcs: i64,
268}
269
270#[derive(Debug, Serialize)]
271pub struct RabbitMqQueueRequest {
272    pub auto_delete: bool,
273    pub durable: bool,
274    #[serde(skip_serializing_if = "Option::is_none")]
275    pub arguments: Option<HashMap<String, String>>,
276    #[serde(skip_serializing_if = "Option::is_none")]
277    pub node: Option<String>,
278}
279
280#[derive(Debug, Serialize)]
281pub struct RabbitMqQueueActionRequest {
282    pub action: RabbitMqQueueAction,
283}
284
285#[derive(Debug, Serialize)]
286#[serde(rename_all = "snake_case")]
287pub enum RabbitMqQueueAction {
288    Sync,
289    CancelSync,
290}