1#![deny(missing_docs)]
2
3use amq_protocol_types::{ChannelId, FrameSize, Heartbeat};
9use url::Url;
10
11use std::{fmt, num::ParseIntError, str::FromStr};
12
13#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct AMQPUri {
16 pub scheme: AMQPScheme,
18 pub authority: AMQPAuthority,
20 pub vhost: String,
22 pub query: AMQPQueryString,
24}
25
26#[derive(Clone, Debug, Default, PartialEq, Eq)]
28pub enum AMQPScheme {
29 #[default]
31 AMQP,
32 AMQPS,
34}
35
36impl FromStr for AMQPScheme {
37 type Err = String;
38
39 fn from_str(s: &str) -> Result<Self, Self::Err> {
40 match s {
41 "amqp" => Ok(AMQPScheme::AMQP),
42 "amqps" => Ok(AMQPScheme::AMQPS),
43 s => Err(format!("Invalid AMQP scheme: {s}")),
44 }
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct AMQPAuthority {
51 pub userinfo: AMQPUserInfo,
53 pub host: String,
55 pub port: u16,
57}
58
59#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct AMQPUserInfo {
62 pub username: String,
64 pub password: String,
66}
67
68#[derive(Clone, Debug, Default, PartialEq, Eq)]
70pub struct AMQPQueryString {
71 pub frame_max: Option<FrameSize>,
73 pub channel_max: Option<ChannelId>,
75 pub heartbeat: Option<Heartbeat>,
77 pub connection_timeout: Option<u64>,
79 pub auth_mechanism: Option<SASLMechanism>,
81 }
85
86#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
88pub enum SASLMechanism {
89 AMQPlain,
91 Anonymous,
93 External,
95 #[default]
97 Plain,
98 RabbitCrDemo,
100}
101
102impl SASLMechanism {
103 pub fn name(&self) -> &'static str {
105 match self {
106 SASLMechanism::AMQPlain => "AMQPLAIN",
107 SASLMechanism::Anonymous => "ANONYMOUS",
108 SASLMechanism::External => "EXTERNAL",
109 SASLMechanism::Plain => "PLAIN",
110 SASLMechanism::RabbitCrDemo => "RABBIT-CR-DEMO",
111 }
112 }
113}
114
115impl fmt::Display for SASLMechanism {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.write_str(self.name())
118 }
119}
120
121impl FromStr for SASLMechanism {
122 type Err = String;
123
124 fn from_str(s: &str) -> Result<Self, Self::Err> {
125 match s.to_lowercase().as_str() {
126 "amqplain" => Ok(SASLMechanism::AMQPlain),
127 "anonymous" => Ok(SASLMechanism::Anonymous),
128 "external" => Ok(SASLMechanism::External),
129 "plain" => Ok(SASLMechanism::Plain),
130 "rabbit-cr-demo" => Ok(SASLMechanism::RabbitCrDemo),
131 s => Err(format!("Invalid SASL mechanism: {s}")),
132 }
133 }
134}
135
136fn percent_decode(s: &str) -> Result<String, String> {
137 percent_encoding::percent_decode(s.as_bytes())
138 .decode_utf8()
139 .map(|s| s.to_string())
140 .map_err(|e| e.to_string())
141}
142
143impl Default for AMQPUri {
144 fn default() -> Self {
145 AMQPUri {
146 scheme: Default::default(),
147 authority: Default::default(),
148 vhost: "/".to_string(),
149 query: Default::default(),
150 }
151 }
152}
153
154fn int_queryparam<T: FromStr<Err = ParseIntError>>(
155 url: &Url,
156 param: &str,
157) -> Result<Option<T>, String> {
158 url.query_pairs()
159 .find(|(key, _)| key == param)
160 .map_or(Ok(None), |(_, ref value)| value.parse::<T>().map(Some))
161 .map_err(|e: ParseIntError| e.to_string())
162}
163
164impl FromStr for AMQPUri {
165 type Err = String;
166
167 fn from_str(s: &str) -> Result<Self, Self::Err> {
168 let url = Url::parse(s).map_err(|e| e.to_string())?;
169 if url.cannot_be_a_base() {
170 return Err(format!("Invalid URL: '{s}'"));
171 }
172 let default = AMQPUri::default();
173 let scheme = url.scheme().parse::<AMQPScheme>()?;
174 let username = match url.username() {
175 "" => default.authority.userinfo.username,
176 username => percent_decode(username)?,
177 };
178 let password = url
179 .password()
180 .map_or(Ok(default.authority.userinfo.password), percent_decode)?;
181 let host = url
182 .domain()
183 .map_or(Ok(default.authority.host), percent_decode)?;
184 let port = url.port().unwrap_or_else(|| scheme.default_port());
185 let vhost = percent_decode(url.path().get(1..).unwrap_or("/"))?;
186 let frame_max = int_queryparam(&url, "frame_max")?;
187 let channel_max = int_queryparam(&url, "channel_max")?;
188 let heartbeat = int_queryparam(&url, "heartbeat")?;
189 let connection_timeout = int_queryparam(&url, "connection_timeout")?;
190 let auth_mechanism = url
191 .query_pairs()
192 .find(|(key, _)| key == "auth_mechanism")
193 .map_or(Ok(None), |(_, ref value)| value.parse().map(Some))?;
194
195 Ok(AMQPUri {
196 scheme,
197 authority: AMQPAuthority {
198 userinfo: AMQPUserInfo { username, password },
199 host,
200 port,
201 },
202 vhost,
203 query: AMQPQueryString {
204 frame_max,
205 channel_max,
206 heartbeat,
207 connection_timeout,
208 auth_mechanism,
209 },
210 })
211 }
212}
213
214impl AMQPScheme {
215 pub fn default_port(&self) -> u16 {
217 match *self {
218 AMQPScheme::AMQP => 5672,
219 AMQPScheme::AMQPS => 5671,
220 }
221 }
222}
223
224impl Default for AMQPAuthority {
225 fn default() -> Self {
226 AMQPAuthority {
227 userinfo: Default::default(),
228 host: "localhost".to_string(),
229 port: AMQPScheme::default().default_port(),
230 }
231 }
232}
233
234impl Default for AMQPUserInfo {
235 fn default() -> Self {
236 AMQPUserInfo {
237 username: "guest".to_string(),
238 password: "guest".to_string(),
239 }
240 }
241}
242
243#[cfg(test)]
244mod test {
245 use super::*;
246
247 #[test]
248 fn test_parse_amqp_no_path() {
249 let uri = "amqp://localhost".parse();
250 assert_eq!(uri, Ok(AMQPUri::default()));
251 }
252
253 #[test]
254 fn test_parse_amqp() {
255 let uri = "amqp://localhost/%2f".parse();
256 assert_eq!(uri, Ok(AMQPUri::default()));
257 }
258
259 #[test]
260 fn test_parse_amqps() {
261 let uri = "amqps://localhost/".parse();
262 assert_eq!(
263 uri,
264 Ok(AMQPUri {
265 scheme: AMQPScheme::AMQPS,
266 authority: AMQPAuthority {
267 port: 5671,
268 ..Default::default()
269 },
270 vhost: "".to_string(),
271 ..Default::default()
272 })
273 );
274 }
275
276 #[test]
277 fn test_parse_amqps_with_creds() {
278 let uri = "amqps://user:pass@hostname/v?foo=bar".parse();
279 assert_eq!(
280 uri,
281 Ok(AMQPUri {
282 scheme: AMQPScheme::AMQPS,
283 authority: AMQPAuthority {
284 userinfo: AMQPUserInfo {
285 username: "user".to_string(),
286 password: "pass".to_string(),
287 },
288 host: "hostname".to_string(),
289 port: 5671,
290 },
291 vhost: "v".to_string(),
292 ..Default::default()
293 })
294 );
295 }
296
297 #[test]
298 fn test_parse_amqps_with_creds_percent() {
299 let uri = "amqp://user%61:%61pass@ho%61st:10000/v%2fhost".parse();
300 assert_eq!(
301 uri,
302 Ok(AMQPUri {
303 scheme: AMQPScheme::AMQP,
304 authority: AMQPAuthority {
305 userinfo: AMQPUserInfo {
306 username: "usera".to_string(),
307 password: "apass".to_string(),
308 },
309 host: "hoast".to_string(),
310 port: 10000,
311 },
312 vhost: "v/host".to_string(),
313 ..Default::default()
314 })
315 );
316 }
317
318 #[test]
319 fn test_parse_with_heartbeat_frame_max() {
320 let uri = "amqp://localhost/%2f?heartbeat=42&frame_max=64&connection_timeout=30000".parse();
321 assert_eq!(
322 uri,
323 Ok(AMQPUri {
324 query: AMQPQueryString {
325 frame_max: Some(64),
326 heartbeat: Some(42),
327 connection_timeout: Some(30000),
328 ..Default::default()
329 },
330 ..Default::default()
331 })
332 );
333 }
334
335 #[test]
336 fn test_url_with_no_base() {
337 let uri: Result<AMQPUri, String> = "foo".parse();
338 assert_eq!(uri, Err("relative URL without a base".to_string()));
339 }
340
341 #[test]
342 fn test_invalid_url() {
343 let uri: Result<AMQPUri, String> = "foo:bar".parse();
344 assert_eq!(uri, Err("Invalid URL: 'foo:bar'".to_string()));
345 }
346
347 #[test]
348 fn test_invalid_scheme() {
349 let uri: Result<AMQPUri, String> = "http://localhost/".parse();
350 assert_eq!(uri, Err("Invalid AMQP scheme: http".to_string()));
351 }
352}