rabbitmq_management_client/api/
exchange.rs

1use crate::api::_generic::{handle_empty_response, handle_response};
2use crate::api::binding::RabbitMqBinding;
3use crate::api::{RabbitMqPaginatedResponse, RabbitMqPagination, RabbitMqPaginationFilter};
4use crate::errors::RabbitMqClientError;
5use crate::RabbitMqClient;
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8
9use super::options::pagination::RabbitMqPaginationRequest;
10use super::options::RabbitMqRequestOptions;
11
12#[async_trait]
13pub trait ExchangeApi {
14    async fn list_exchanges(
15        &self,
16        vhost: Option<String>,
17        options: Option<RabbitMqRequestOptions>,
18    ) -> Result<RabbitMqPaginatedResponse<RabbitMqExchange>, RabbitMqClientError>;
19
20    async fn get_exchange(
21        &self,
22        vhost: String,
23        exchange: String,
24    ) -> Result<RabbitMqExchange, RabbitMqClientError>;
25
26    async fn create_exchange(
27        &self,
28        vhost: String,
29        exchange: String,
30        request: RabbitMqExchangeRequest,
31    ) -> Result<(), RabbitMqClientError>;
32
33    async fn update_exchange(
34        &self,
35        vhost: String,
36        exchange: String,
37        request: RabbitMqExchangeRequest,
38    ) -> Result<(), RabbitMqClientError>;
39
40    async fn delete_exchange(
41        &self,
42        vhost: String,
43        exchange: String,
44    ) -> Result<(), RabbitMqClientError>;
45
46    async fn list_source_bindings(
47        &self,
48        vhost: String,
49        exchange: String,
50    ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError>;
51
52    async fn list_destination_bindings(
53        &self,
54        vhost: String,
55        exchange: String,
56    ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError>;
57}
58
59#[async_trait]
60impl ExchangeApi for RabbitMqClient {
61    async fn list_exchanges(
62        &self,
63        vhost: Option<String>,
64        options: Option<RabbitMqRequestOptions>,
65    ) -> Result<RabbitMqPaginatedResponse<RabbitMqExchange>, RabbitMqClientError> {
66        let options: RabbitMqRequestOptions = options.unwrap_or_default();
67        let pagination: RabbitMqPaginationRequest = options.pagination.unwrap_or_default().into();
68
69        let response = self
70            .client
71            .request(
72                reqwest::Method::GET,
73                format!(
74                    "{}/api/exchanges/{}",
75                    self.api_url,
76                    vhost.unwrap_or_default()
77                ),
78            )
79            .query(&pagination)
80            .query(&options.sorting)
81            .query(&[("disable_stats", options.disable_stats)])
82            .send()
83            .await?;
84
85        handle_response(response).await
86    }
87
88    async fn get_exchange(
89        &self,
90        vhost: String,
91        exchange: String,
92    ) -> Result<RabbitMqExchange, RabbitMqClientError> {
93        let response = self
94            .client
95            .request(
96                reqwest::Method::GET,
97                format!("{}/api/exchanges/{}/{}", self.api_url, vhost, exchange),
98            )
99            .send()
100            .await?;
101
102        handle_response(response).await
103    }
104
105    async fn create_exchange(
106        &self,
107        vhost: String,
108        exchange: String,
109        request: RabbitMqExchangeRequest,
110    ) -> Result<(), RabbitMqClientError> {
111        let exchanges = self
112            .list_exchanges(
113                Some(vhost.clone()),
114                Some(RabbitMqRequestOptions {
115                    pagination: Some(RabbitMqPagination {
116                        page: 1,
117                        page_size: None,
118                        filter: Some(RabbitMqPaginationFilter::RegexFilter(format!(
119                            "({exchange}$)"
120                        ))),
121                    }),
122                    ..Default::default()
123                }),
124            )
125            .await?;
126
127        if let Some(existing) = exchanges.items.iter().find(|e| e.name == exchange) {
128            return Err(RabbitMqClientError::AlreadyExists(format!(
129                "{} exchange",
130                existing.name
131            )));
132        }
133
134        self.update_exchange(vhost, exchange, request).await
135    }
136
137    async fn update_exchange(
138        &self,
139        vhost: String,
140        exchange: String,
141        request: RabbitMqExchangeRequest,
142    ) -> Result<(), RabbitMqClientError> {
143        let response = self
144            .client
145            .request(
146                reqwest::Method::PUT,
147                format!("{}/api/exchanges/{}/{}", self.api_url, vhost, exchange),
148            )
149            .json(&request)
150            .send()
151            .await?;
152
153        handle_empty_response(response).await
154    }
155
156    async fn delete_exchange(
157        &self,
158        vhost: String,
159        exchange: String,
160    ) -> Result<(), RabbitMqClientError> {
161        let response = self
162            .client
163            .request(
164                reqwest::Method::DELETE,
165                format!("{}/api/exchanges/{}/{}", self.api_url, vhost, exchange),
166            )
167            .send()
168            .await?;
169
170        handle_empty_response(response).await
171    }
172
173    async fn list_source_bindings(
174        &self,
175        vhost: String,
176        exchange: String,
177    ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError> {
178        let response = self
179            .client
180            .request(
181                reqwest::Method::GET,
182                format!(
183                    "{}/api/exchanges/{}/{}/bindings/source",
184                    self.api_url, vhost, exchange
185                ),
186            )
187            .send()
188            .await?;
189
190        handle_response(response).await
191    }
192
193    async fn list_destination_bindings(
194        &self,
195        vhost: String,
196        exchange: String,
197    ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError> {
198        let response = self
199            .client
200            .request(
201                reqwest::Method::GET,
202                format!(
203                    "{}/api/exchanges/{}/{}/bindings/destination",
204                    self.api_url, vhost, exchange
205                ),
206            )
207            .send()
208            .await?;
209
210        handle_response(response).await
211    }
212}
213
214#[derive(Debug, Deserialize)]
215pub struct RabbitMqExchange {
216    pub auto_delete: bool,
217    pub durable: bool,
218    pub internal: bool,
219    pub name: String,
220    #[serde(rename = "type")]
221    pub kind: String,
222    pub user_who_performed_action: String,
223    pub vhost: String,
224    pub message_stats: Option<RabbitMqExchangeMessageStats>,
225}
226
227#[derive(Debug, Deserialize)]
228pub struct RabbitMqExchangeMessageStats {
229    pub publish_in: Option<i64>,
230    pub publish_out: Option<i64>,
231}
232
233#[derive(Debug, Serialize)]
234pub struct RabbitMqExchangeRequest {
235    #[serde(rename = "type")]
236    pub kind: String,
237    pub auto_delete: bool,
238    pub durable: bool,
239    pub internal: bool,
240}