1#![deny(missing_docs, missing_debug_implementations, unsafe_code)]
2#![warn(unreachable_pub, unused_qualifications, unused_lifetimes)]
3#![warn(
4 clippy::must_use_candidate,
5 clippy::unwrap_in_result,
6 clippy::panic_in_result_fn
7)]
8
9use amq_protocol_types::{ChannelId, FrameSize, Heartbeat};
15use url::Url;
16
17use std::{fmt, num::ParseIntError, str::FromStr};
18
19#[derive(Clone, Debug, PartialEq, Eq)]
21pub struct AMQPUri {
22 pub scheme: AMQPScheme,
24 pub authority: AMQPAuthority,
26 pub vhost: String,
28 pub query: AMQPQueryString,
30}
31
32#[derive(Clone, Debug, Default, PartialEq, Eq)]
34pub enum AMQPScheme {
35 #[default]
37 AMQP,
38 AMQPS,
40}
41
42impl FromStr for AMQPScheme {
43 type Err = String;
44
45 fn from_str(s: &str) -> Result<Self, Self::Err> {
46 match s {
47 "amqp" => Ok(AMQPScheme::AMQP),
48 "amqps" => Ok(AMQPScheme::AMQPS),
49 s => Err(format!("Invalid AMQP scheme: {s}")),
50 }
51 }
52}
53
54impl fmt::Display for AMQPScheme {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 f.write_str(match self {
57 AMQPScheme::AMQP => "amqp",
58 AMQPScheme::AMQPS => "amqps",
59 })
60 }
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
65pub struct AMQPAuthority {
66 pub userinfo: AMQPUserInfo,
68 pub host: String,
70 pub port: u16,
72}
73
74#[derive(Clone, Debug, PartialEq, Eq)]
76pub struct AMQPUserInfo {
77 pub username: String,
79 pub password: String,
81}
82
83#[derive(Clone, Debug, Default, PartialEq, Eq)]
85pub struct AMQPQueryString {
86 pub frame_max: Option<FrameSize>,
88 pub channel_max: Option<ChannelId>,
90 pub heartbeat: Option<Heartbeat>,
92 pub connection_timeout: Option<u64>,
94 pub auth_mechanism: Option<SASLMechanism>,
96 }
100
101#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
103pub enum SASLMechanism {
104 AMQPlain,
106 Anonymous,
108 External,
110 #[default]
112 Plain,
113 RabbitCrDemo,
115}
116
117impl SASLMechanism {
118 pub fn name(&self) -> &'static str {
120 match self {
121 SASLMechanism::AMQPlain => "AMQPLAIN",
122 SASLMechanism::Anonymous => "ANONYMOUS",
123 SASLMechanism::External => "EXTERNAL",
124 SASLMechanism::Plain => "PLAIN",
125 SASLMechanism::RabbitCrDemo => "RABBIT-CR-DEMO",
126 }
127 }
128}
129
130impl fmt::Display for SASLMechanism {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 f.write_str(self.name())
133 }
134}
135
136impl FromStr for SASLMechanism {
137 type Err = String;
138
139 fn from_str(s: &str) -> Result<Self, Self::Err> {
140 match s.to_lowercase().as_str() {
141 "amqplain" => Ok(SASLMechanism::AMQPlain),
142 "anonymous" => Ok(SASLMechanism::Anonymous),
143 "external" => Ok(SASLMechanism::External),
144 "plain" => Ok(SASLMechanism::Plain),
145 "rabbit-cr-demo" => Ok(SASLMechanism::RabbitCrDemo),
146 s => Err(format!("Invalid SASL mechanism: {s}")),
147 }
148 }
149}
150
151fn percent_decode(s: &str) -> Result<String, String> {
152 percent_encoding::percent_decode(s.as_bytes())
153 .decode_utf8()
154 .map(|s| s.to_string())
155 .map_err(|e| e.to_string())
156}
157
158fn percent_encode<'a>(s: &'a str) -> percent_encoding::PercentEncode<'a> {
159 percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC)
160}
161
162impl Default for AMQPUri {
163 fn default() -> Self {
164 AMQPUri {
165 scheme: Default::default(),
166 authority: Default::default(),
167 vhost: "/".to_string(),
168 query: Default::default(),
169 }
170 }
171}
172
173fn int_queryparam<T: FromStr<Err = ParseIntError>>(
174 url: &Url,
175 param: &str,
176) -> Result<Option<T>, String> {
177 url.query_pairs()
178 .find(|(key, _)| key == param)
179 .map_or(Ok(None), |(_, ref value)| value.parse::<T>().map(Some))
180 .map_err(|e: ParseIntError| e.to_string())
181}
182
183impl FromStr for AMQPUri {
184 type Err = String;
185
186 fn from_str(s: &str) -> Result<Self, Self::Err> {
187 let url = Url::parse(s).map_err(|e| e.to_string())?;
188 if url.cannot_be_a_base() {
189 return Err(format!("Invalid URL: '{s}'"));
190 }
191 let default = AMQPUri::default();
192 let scheme = url.scheme().parse::<AMQPScheme>()?;
193 let username = match url.username() {
194 "" => default.authority.userinfo.username,
195 username => percent_decode(username)?,
196 };
197 let password = url
198 .password()
199 .map_or(Ok(default.authority.userinfo.password), percent_decode)?;
200 let host = url
201 .domain()
202 .map_or(Ok(default.authority.host), percent_decode)?;
203 let port = url.port().unwrap_or_else(|| scheme.default_port());
204 let vhost = percent_decode(url.path().get(1..).unwrap_or("/"))?;
205 let frame_max = int_queryparam(&url, "frame_max")?;
206 let channel_max = int_queryparam(&url, "channel_max")?;
207 let heartbeat = int_queryparam(&url, "heartbeat")?;
208 let connection_timeout = int_queryparam(&url, "connection_timeout")?;
209 let auth_mechanism = url
210 .query_pairs()
211 .find(|(key, _)| key == "auth_mechanism")
212 .map_or(Ok(None), |(_, ref value)| value.parse().map(Some))?;
213
214 Ok(AMQPUri {
215 scheme,
216 authority: AMQPAuthority {
217 userinfo: AMQPUserInfo { username, password },
218 host,
219 port,
220 },
221 vhost,
222 query: AMQPQueryString {
223 frame_max,
224 channel_max,
225 heartbeat,
226 connection_timeout,
227 auth_mechanism,
228 },
229 })
230 }
231}
232
233impl fmt::Display for AMQPUri {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 write!(
236 f,
237 "{}://{}:{}@{}:{}/{}",
238 self.scheme,
239 percent_encode(&self.authority.userinfo.username),
240 percent_encode(&self.authority.userinfo.password),
241 self.authority.host,
242 self.authority.port,
243 percent_encode(&self.vhost),
244 )?;
245 let mut sep = '?';
246 if let Some(v) = self.query.frame_max {
247 write!(f, "{sep}frame_max={v}")?;
248 sep = '&';
249 }
250 if let Some(v) = self.query.channel_max {
251 write!(f, "{sep}channel_max={v}")?;
252 sep = '&';
253 }
254 if let Some(v) = self.query.heartbeat {
255 write!(f, "{sep}heartbeat={v}")?;
256 sep = '&';
257 }
258 if let Some(v) = self.query.connection_timeout {
259 write!(f, "{sep}connection_timeout={v}")?;
260 sep = '&';
261 }
262 if let Some(v) = self.query.auth_mechanism {
263 write!(f, "{sep}auth_mechanism={v}")?;
264 }
265 Ok(())
266 }
267}
268
269impl AMQPScheme {
270 pub fn default_port(&self) -> u16 {
272 match *self {
273 AMQPScheme::AMQP => 5672,
274 AMQPScheme::AMQPS => 5671,
275 }
276 }
277}
278
279impl Default for AMQPAuthority {
280 fn default() -> Self {
281 AMQPAuthority {
282 userinfo: Default::default(),
283 host: "localhost".to_string(),
284 port: AMQPScheme::default().default_port(),
285 }
286 }
287}
288
289impl Default for AMQPUserInfo {
290 fn default() -> Self {
291 AMQPUserInfo {
292 username: "guest".to_string(),
293 password: "guest".to_string(),
294 }
295 }
296}
297
298#[cfg(test)]
299mod test {
300 use super::*;
301
302 #[test]
303 fn test_parse_amqp_no_path() {
304 let uri = "amqp://localhost".parse();
305 assert_eq!(uri, Ok(AMQPUri::default()));
306 }
307
308 #[test]
309 fn test_parse_amqp() {
310 let uri = "amqp://localhost/%2f".parse();
311 assert_eq!(uri, Ok(AMQPUri::default()));
312 }
313
314 #[test]
315 fn test_parse_amqps() {
316 let uri = "amqps://localhost/".parse();
317 assert_eq!(
318 uri,
319 Ok(AMQPUri {
320 scheme: AMQPScheme::AMQPS,
321 authority: AMQPAuthority {
322 port: 5671,
323 ..Default::default()
324 },
325 vhost: "".to_string(),
326 ..Default::default()
327 })
328 );
329 }
330
331 #[test]
332 fn test_parse_amqps_with_creds() {
333 let uri = "amqps://user:pass@hostname/v?foo=bar".parse();
334 assert_eq!(
335 uri,
336 Ok(AMQPUri {
337 scheme: AMQPScheme::AMQPS,
338 authority: AMQPAuthority {
339 userinfo: AMQPUserInfo {
340 username: "user".to_string(),
341 password: "pass".to_string(),
342 },
343 host: "hostname".to_string(),
344 port: 5671,
345 },
346 vhost: "v".to_string(),
347 ..Default::default()
348 })
349 );
350 }
351
352 #[test]
353 fn test_parse_amqps_with_creds_percent() {
354 let uri = "amqp://user%61:%61pass@ho%61st:10000/v%2fhost".parse();
355 assert_eq!(
356 uri,
357 Ok(AMQPUri {
358 scheme: AMQPScheme::AMQP,
359 authority: AMQPAuthority {
360 userinfo: AMQPUserInfo {
361 username: "usera".to_string(),
362 password: "apass".to_string(),
363 },
364 host: "hoast".to_string(),
365 port: 10000,
366 },
367 vhost: "v/host".to_string(),
368 ..Default::default()
369 })
370 );
371 }
372
373 #[test]
374 fn test_parse_with_heartbeat_frame_max() {
375 let uri = "amqp://localhost/%2f?heartbeat=42&frame_max=64&connection_timeout=30000".parse();
376 assert_eq!(
377 uri,
378 Ok(AMQPUri {
379 query: AMQPQueryString {
380 frame_max: Some(64),
381 heartbeat: Some(42),
382 connection_timeout: Some(30000),
383 ..Default::default()
384 },
385 ..Default::default()
386 })
387 );
388 }
389
390 #[test]
391 fn test_url_with_no_base() {
392 let uri: Result<AMQPUri, String> = "foo".parse();
393 assert_eq!(uri, Err("relative URL without a base".to_string()));
394 }
395
396 #[test]
397 fn test_invalid_url() {
398 let uri: Result<AMQPUri, String> = "foo:bar".parse();
399 assert_eq!(uri, Err("Invalid URL: 'foo:bar'".to_string()));
400 }
401
402 #[test]
403 fn test_invalid_scheme() {
404 let uri: Result<AMQPUri, String> = "http://localhost/".parse();
405 assert_eq!(uri, Err("Invalid AMQP scheme: http".to_string()));
406 }
407}