1#![deny(missing_docs)]
2
3use amq_protocol_types::{ChannelId, FrameSize, Heartbeat};
9use url::Url;
10
11use std::{fmt, num::ParseIntError, str::FromStr};
12
13#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct AMQPUri {
16 pub scheme: AMQPScheme,
18 pub authority: AMQPAuthority,
20 pub vhost: String,
22 pub query: AMQPQueryString,
24}
25
26#[derive(Clone, Debug, Default, PartialEq, Eq)]
28pub enum AMQPScheme {
29 #[default]
31 AMQP,
32 AMQPS,
34}
35
36impl FromStr for AMQPScheme {
37 type Err = String;
38
39 fn from_str(s: &str) -> Result<Self, Self::Err> {
40 match s {
41 "amqp" => Ok(AMQPScheme::AMQP),
42 "amqps" => Ok(AMQPScheme::AMQPS),
43 s => Err(format!("Invalid AMQP scheme: {s}")),
44 }
45 }
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct AMQPAuthority {
51 pub userinfo: AMQPUserInfo,
53 pub host: String,
55 pub port: u16,
57}
58
59#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct AMQPUserInfo {
62 pub username: String,
64 pub password: String,
66}
67
68#[derive(Clone, Debug, Default, PartialEq, Eq)]
70pub struct AMQPQueryString {
71 pub frame_max: Option<FrameSize>,
73 pub channel_max: Option<ChannelId>,
75 pub heartbeat: Option<Heartbeat>,
77 pub connection_timeout: Option<u64>,
79 pub auth_mechanism: Option<SASLMechanism>,
81 }
85
86#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
88pub enum SASLMechanism {
89 AMQPlain,
91 External,
93 #[default]
95 Plain,
96 RabbitCrDemo,
98}
99
100impl fmt::Display for SASLMechanism {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 f.write_str(match self {
103 SASLMechanism::AMQPlain => "AMQPLAIN",
104 SASLMechanism::External => "EXTERNAL",
105 SASLMechanism::Plain => "PLAIN",
106 SASLMechanism::RabbitCrDemo => "RABBIT-CR-DEMO",
107 })
108 }
109}
110
111impl FromStr for SASLMechanism {
112 type Err = String;
113
114 fn from_str(s: &str) -> Result<Self, Self::Err> {
115 match s.to_lowercase().as_str() {
116 "amqplain" => Ok(SASLMechanism::AMQPlain),
117 "external" => Ok(SASLMechanism::External),
118 "plain" => Ok(SASLMechanism::Plain),
119 "rabbit-cr-demo" => Ok(SASLMechanism::RabbitCrDemo),
120 s => Err(format!("Invalid SASL mechanism: {s}")),
121 }
122 }
123}
124
125fn percent_decode(s: &str) -> Result<String, String> {
126 percent_encoding::percent_decode(s.as_bytes())
127 .decode_utf8()
128 .map(|s| s.to_string())
129 .map_err(|e| e.to_string())
130}
131
132impl Default for AMQPUri {
133 fn default() -> Self {
134 AMQPUri {
135 scheme: Default::default(),
136 authority: Default::default(),
137 vhost: "/".to_string(),
138 query: Default::default(),
139 }
140 }
141}
142
143fn int_queryparam<T: FromStr<Err = ParseIntError>>(
144 url: &Url,
145 param: &str,
146) -> Result<Option<T>, String> {
147 url.query_pairs()
148 .find(|(key, _)| key == param)
149 .map_or(Ok(None), |(_, ref value)| value.parse::<T>().map(Some))
150 .map_err(|e: ParseIntError| e.to_string())
151}
152
153impl FromStr for AMQPUri {
154 type Err = String;
155
156 fn from_str(s: &str) -> Result<Self, Self::Err> {
157 let url = Url::parse(s).map_err(|e| e.to_string())?;
158 if url.cannot_be_a_base() {
159 return Err(format!("Invalid URL: '{s}'"));
160 }
161 let default = AMQPUri::default();
162 let scheme = url.scheme().parse::<AMQPScheme>()?;
163 let username = match url.username() {
164 "" => default.authority.userinfo.username,
165 username => percent_decode(username)?,
166 };
167 let password = url
168 .password()
169 .map_or(Ok(default.authority.userinfo.password), percent_decode)?;
170 let host = url
171 .domain()
172 .map_or(Ok(default.authority.host), percent_decode)?;
173 let port = url.port().unwrap_or_else(|| scheme.default_port());
174 let vhost = percent_decode(url.path().get(1..).unwrap_or("/"))?;
175 let frame_max = int_queryparam(&url, "frame_max")?;
176 let channel_max = int_queryparam(&url, "channel_max")?;
177 let heartbeat = int_queryparam(&url, "heartbeat")?;
178 let connection_timeout = int_queryparam(&url, "connection_timeout")?;
179 let auth_mechanism = url
180 .query_pairs()
181 .find(|(key, _)| key == "auth_mechanism")
182 .map_or(Ok(None), |(_, ref value)| value.parse().map(Some))?;
183
184 Ok(AMQPUri {
185 scheme,
186 authority: AMQPAuthority {
187 userinfo: AMQPUserInfo { username, password },
188 host,
189 port,
190 },
191 vhost,
192 query: AMQPQueryString {
193 frame_max,
194 channel_max,
195 heartbeat,
196 connection_timeout,
197 auth_mechanism,
198 },
199 })
200 }
201}
202
203impl AMQPScheme {
204 pub fn default_port(&self) -> u16 {
206 match *self {
207 AMQPScheme::AMQP => 5672,
208 AMQPScheme::AMQPS => 5671,
209 }
210 }
211}
212
213impl Default for AMQPAuthority {
214 fn default() -> Self {
215 AMQPAuthority {
216 userinfo: Default::default(),
217 host: "localhost".to_string(),
218 port: AMQPScheme::default().default_port(),
219 }
220 }
221}
222
223impl Default for AMQPUserInfo {
224 fn default() -> Self {
225 AMQPUserInfo {
226 username: "guest".to_string(),
227 password: "guest".to_string(),
228 }
229 }
230}
231
232#[cfg(test)]
233mod test {
234 use super::*;
235
236 #[test]
237 fn test_parse_amqp_no_path() {
238 let uri = "amqp://localhost".parse();
239 assert_eq!(uri, Ok(AMQPUri::default()));
240 }
241
242 #[test]
243 fn test_parse_amqp() {
244 let uri = "amqp://localhost/%2f".parse();
245 assert_eq!(uri, Ok(AMQPUri::default()));
246 }
247
248 #[test]
249 fn test_parse_amqps() {
250 let uri = "amqps://localhost/".parse();
251 assert_eq!(
252 uri,
253 Ok(AMQPUri {
254 scheme: AMQPScheme::AMQPS,
255 authority: AMQPAuthority {
256 port: 5671,
257 ..Default::default()
258 },
259 vhost: "".to_string(),
260 ..Default::default()
261 })
262 );
263 }
264
265 #[test]
266 fn test_parse_amqps_with_creds() {
267 let uri = "amqps://user:pass@hostname/v?foo=bar".parse();
268 assert_eq!(
269 uri,
270 Ok(AMQPUri {
271 scheme: AMQPScheme::AMQPS,
272 authority: AMQPAuthority {
273 userinfo: AMQPUserInfo {
274 username: "user".to_string(),
275 password: "pass".to_string(),
276 },
277 host: "hostname".to_string(),
278 port: 5671,
279 },
280 vhost: "v".to_string(),
281 ..Default::default()
282 })
283 );
284 }
285
286 #[test]
287 fn test_parse_amqps_with_creds_percent() {
288 let uri = "amqp://user%61:%61pass@ho%61st:10000/v%2fhost".parse();
289 assert_eq!(
290 uri,
291 Ok(AMQPUri {
292 scheme: AMQPScheme::AMQP,
293 authority: AMQPAuthority {
294 userinfo: AMQPUserInfo {
295 username: "usera".to_string(),
296 password: "apass".to_string(),
297 },
298 host: "hoast".to_string(),
299 port: 10000,
300 },
301 vhost: "v/host".to_string(),
302 ..Default::default()
303 })
304 );
305 }
306
307 #[test]
308 fn test_parse_with_heartbeat_frame_max() {
309 let uri = "amqp://localhost/%2f?heartbeat=42&frame_max=64&connection_timeout=30000".parse();
310 assert_eq!(
311 uri,
312 Ok(AMQPUri {
313 query: AMQPQueryString {
314 frame_max: Some(64),
315 heartbeat: Some(42),
316 connection_timeout: Some(30000),
317 ..Default::default()
318 },
319 ..Default::default()
320 })
321 );
322 }
323
324 #[test]
325 fn test_url_with_no_base() {
326 let uri: Result<AMQPUri, String> = "foo".parse();
327 assert_eq!(uri, Err("relative URL without a base".to_string()));
328 }
329
330 #[test]
331 fn test_invalid_url() {
332 let uri: Result<AMQPUri, String> = "foo:bar".parse();
333 assert_eq!(uri, Err("Invalid URL: 'foo:bar'".to_string()));
334 }
335
336 #[test]
337 fn test_invalid_scheme() {
338 let uri: Result<AMQPUri, String> = "http://localhost/".parse();
339 assert_eq!(uri, Err("Invalid AMQP scheme: http".to_string()));
340 }
341}