rabbitmq_management_client/api/
exchange.rs1use crate::api::_generic::{handle_empty_response, handle_response};
2use crate::api::binding::RabbitMqBinding;
3use crate::api::{RabbitMqPaginatedResponse, RabbitMqPagination, RabbitMqPaginationFilter};
4use crate::errors::RabbitMqClientError;
5use crate::RabbitMqClient;
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8
9use super::options::pagination::RabbitMqPaginationRequest;
10use super::options::RabbitMqRequestOptions;
11
12#[async_trait]
13pub trait ExchangeApi {
14 async fn list_exchanges(
15 &self,
16 vhost: Option<String>,
17 options: Option<RabbitMqRequestOptions>,
18 ) -> Result<RabbitMqPaginatedResponse<RabbitMqExchange>, RabbitMqClientError>;
19
20 async fn get_exchange(
21 &self,
22 vhost: String,
23 exchange: String,
24 ) -> Result<RabbitMqExchange, RabbitMqClientError>;
25
26 async fn create_exchange(
27 &self,
28 vhost: String,
29 exchange: String,
30 request: RabbitMqExchangeRequest,
31 ) -> Result<(), RabbitMqClientError>;
32
33 async fn update_exchange(
34 &self,
35 vhost: String,
36 exchange: String,
37 request: RabbitMqExchangeRequest,
38 ) -> Result<(), RabbitMqClientError>;
39
40 async fn delete_exchange(
41 &self,
42 vhost: String,
43 exchange: String,
44 ) -> Result<(), RabbitMqClientError>;
45
46 async fn list_source_bindings(
47 &self,
48 vhost: String,
49 exchange: String,
50 ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError>;
51
52 async fn list_destination_bindings(
53 &self,
54 vhost: String,
55 exchange: String,
56 ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError>;
57}
58
59#[async_trait]
60impl ExchangeApi for RabbitMqClient {
61 async fn list_exchanges(
62 &self,
63 vhost: Option<String>,
64 options: Option<RabbitMqRequestOptions>,
65 ) -> Result<RabbitMqPaginatedResponse<RabbitMqExchange>, RabbitMqClientError> {
66 let options: RabbitMqRequestOptions = options.unwrap_or_default();
67 let pagination: RabbitMqPaginationRequest = options.pagination.unwrap_or_default().into();
68
69 let response = self
70 .client
71 .request(
72 reqwest::Method::GET,
73 format!(
74 "{}/api/exchanges/{}",
75 self.api_url,
76 vhost.unwrap_or_default()
77 ),
78 )
79 .query(&pagination)
80 .query(&options.sorting)
81 .query(&[("disable_stats", options.disable_stats)])
82 .send()
83 .await?;
84
85 handle_response(response).await
86 }
87
88 async fn get_exchange(
89 &self,
90 vhost: String,
91 exchange: String,
92 ) -> Result<RabbitMqExchange, RabbitMqClientError> {
93 let response = self
94 .client
95 .request(
96 reqwest::Method::GET,
97 format!("{}/api/exchanges/{}/{}", self.api_url, vhost, exchange),
98 )
99 .send()
100 .await?;
101
102 handle_response(response).await
103 }
104
105 async fn create_exchange(
106 &self,
107 vhost: String,
108 exchange: String,
109 request: RabbitMqExchangeRequest,
110 ) -> Result<(), RabbitMqClientError> {
111 let exchanges = self
112 .list_exchanges(
113 Some(vhost.clone()),
114 Some(RabbitMqRequestOptions {
115 pagination: Some(RabbitMqPagination {
116 page: 1,
117 page_size: None,
118 filter: Some(RabbitMqPaginationFilter::RegexFilter(format!(
119 "({exchange}$)"
120 ))),
121 }),
122 ..Default::default()
123 }),
124 )
125 .await?;
126
127 if let Some(existing) = exchanges.items.iter().find(|e| e.name == exchange) {
128 return Err(RabbitMqClientError::AlreadyExists(format!(
129 "{} exchange",
130 existing.name
131 )));
132 }
133
134 self.update_exchange(vhost, exchange, request).await
135 }
136
137 async fn update_exchange(
138 &self,
139 vhost: String,
140 exchange: String,
141 request: RabbitMqExchangeRequest,
142 ) -> Result<(), RabbitMqClientError> {
143 let response = self
144 .client
145 .request(
146 reqwest::Method::PUT,
147 format!("{}/api/exchanges/{}/{}", self.api_url, vhost, exchange),
148 )
149 .json(&request)
150 .send()
151 .await?;
152
153 handle_empty_response(response).await
154 }
155
156 async fn delete_exchange(
157 &self,
158 vhost: String,
159 exchange: String,
160 ) -> Result<(), RabbitMqClientError> {
161 let response = self
162 .client
163 .request(
164 reqwest::Method::DELETE,
165 format!("{}/api/exchanges/{}/{}", self.api_url, vhost, exchange),
166 )
167 .send()
168 .await?;
169
170 handle_empty_response(response).await
171 }
172
173 async fn list_source_bindings(
174 &self,
175 vhost: String,
176 exchange: String,
177 ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError> {
178 let response = self
179 .client
180 .request(
181 reqwest::Method::GET,
182 format!(
183 "{}/api/exchanges/{}/{}/bindings/source",
184 self.api_url, vhost, exchange
185 ),
186 )
187 .send()
188 .await?;
189
190 handle_response(response).await
191 }
192
193 async fn list_destination_bindings(
194 &self,
195 vhost: String,
196 exchange: String,
197 ) -> Result<Vec<RabbitMqBinding>, RabbitMqClientError> {
198 let response = self
199 .client
200 .request(
201 reqwest::Method::GET,
202 format!(
203 "{}/api/exchanges/{}/{}/bindings/destination",
204 self.api_url, vhost, exchange
205 ),
206 )
207 .send()
208 .await?;
209
210 handle_response(response).await
211 }
212}
213
214#[derive(Debug, Deserialize)]
215pub struct RabbitMqExchange {
216 pub auto_delete: bool,
217 pub durable: bool,
218 pub internal: bool,
219 pub name: String,
220 #[serde(rename = "type")]
221 pub kind: String,
222 pub user_who_performed_action: String,
223 pub vhost: String,
224 pub message_stats: Option<RabbitMqExchangeMessageStats>,
225}
226
227#[derive(Debug, Deserialize)]
228pub struct RabbitMqExchangeMessageStats {
229 pub publish_in: Option<i64>,
230 pub publish_out: Option<i64>,
231}
232
233#[derive(Debug, Serialize)]
234pub struct RabbitMqExchangeRequest {
235 #[serde(rename = "type")]
236 pub kind: String,
237 pub auto_delete: bool,
238 pub durable: bool,
239 pub internal: bool,
240}