Skip to main content

amq_protocol_uri/
lib.rs

1#![deny(missing_docs, missing_debug_implementations, unsafe_code)]
2#![warn(unreachable_pub, unused_qualifications, unused_lifetimes)]
3#![warn(
4    clippy::must_use_candidate,
5    clippy::unwrap_in_result,
6    clippy::panic_in_result_fn
7)]
8
9//! # AMQP URI manipulation library
10//!
11//! amq-protocol-uri is a library aiming at providing tools to help
12//! managing AMQP URIs
13
14use amq_protocol_types::{ChannelId, FrameSize, Heartbeat};
15use url::Url;
16
17use std::{fmt, num::ParseIntError, str::FromStr};
18
19/// An AMQP Uri
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub struct AMQPUri {
22    /// The scheme used by the AMQP connection
23    pub scheme: AMQPScheme,
24    /// The connection information
25    pub authority: AMQPAuthority,
26    /// The target vhost
27    pub vhost: String,
28    /// The optional query string to pass parameters to the server
29    pub query: AMQPQueryString,
30}
31
32/// The scheme used by the AMQP connection
33#[derive(Clone, Debug, Default, PartialEq, Eq)]
34pub enum AMQPScheme {
35    /// Plain AMQP
36    #[default]
37    AMQP,
38    /// Encrypted AMQP over TLS
39    AMQPS,
40}
41
42impl FromStr for AMQPScheme {
43    type Err = String;
44
45    fn from_str(s: &str) -> Result<Self, Self::Err> {
46        match s {
47            "amqp" => Ok(AMQPScheme::AMQP),
48            "amqps" => Ok(AMQPScheme::AMQPS),
49            s => Err(format!("Invalid AMQP scheme: {s}")),
50        }
51    }
52}
53
54impl fmt::Display for AMQPScheme {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        f.write_str(match self {
57            AMQPScheme::AMQP => "amqp",
58            AMQPScheme::AMQPS => "amqps",
59        })
60    }
61}
62
63/// The connection information
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub struct AMQPAuthority {
66    /// The credentials used to connect to the server
67    pub userinfo: AMQPUserInfo,
68    /// The server's host
69    pub host: String,
70    /// The port the server listens on
71    pub port: u16,
72}
73
74/// The credentials used to connect to the server
75#[derive(Clone, Debug, PartialEq, Eq)]
76pub struct AMQPUserInfo {
77    /// The username
78    pub username: String,
79    /// The password
80    pub password: String,
81}
82
83/// The optional query string to pass parameters to the server
84#[derive(Clone, Debug, Default, PartialEq, Eq)]
85pub struct AMQPQueryString {
86    /// The maximum size of an AMQP Frame
87    pub frame_max: Option<FrameSize>,
88    /// The maximum number of open channels
89    pub channel_max: Option<ChannelId>,
90    /// The maximum time between two heartbeats
91    pub heartbeat: Option<Heartbeat>,
92    /// The maximum time to wait (in milliseconds) for the connection to succeed
93    pub connection_timeout: Option<u64>,
94    /// The SASL mechanism used for authentication
95    pub auth_mechanism: Option<SASLMechanism>,
96    // Fields available in Erlang implementation for SSL settings:
97    // cacertfile, certfile, keyfile, verify, fail_if_no_peer_cert, password,
98    // server_name_indication, depth
99}
100
101/// The SASL mechanisms supported by RabbitMQ
102#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
103pub enum SASLMechanism {
104    /// This is a legacy mechanism kept for backward compatibility
105    AMQPlain,
106    /// Anonymous authentication if supported by the RabbitMQ server
107    Anonymous,
108    /// Delegate all authentication to the transport instead of the RabbitMQ server
109    External,
110    /// Default plain login, this should be supported everywhere
111    #[default]
112    Plain,
113    /// A demo of RabbitMQ SecureOk mechanism, offers the same level of security as Plain
114    RabbitCrDemo,
115}
116
117impl SASLMechanism {
118    /// Get the name of the SASL mechanism as str
119    pub fn name(&self) -> &'static str {
120        match self {
121            SASLMechanism::AMQPlain => "AMQPLAIN",
122            SASLMechanism::Anonymous => "ANONYMOUS",
123            SASLMechanism::External => "EXTERNAL",
124            SASLMechanism::Plain => "PLAIN",
125            SASLMechanism::RabbitCrDemo => "RABBIT-CR-DEMO",
126        }
127    }
128}
129
130impl fmt::Display for SASLMechanism {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        f.write_str(self.name())
133    }
134}
135
136impl FromStr for SASLMechanism {
137    type Err = String;
138
139    fn from_str(s: &str) -> Result<Self, Self::Err> {
140        match s.to_lowercase().as_str() {
141            "amqplain" => Ok(SASLMechanism::AMQPlain),
142            "anonymous" => Ok(SASLMechanism::Anonymous),
143            "external" => Ok(SASLMechanism::External),
144            "plain" => Ok(SASLMechanism::Plain),
145            "rabbit-cr-demo" => Ok(SASLMechanism::RabbitCrDemo),
146            s => Err(format!("Invalid SASL mechanism: {s}")),
147        }
148    }
149}
150
151fn percent_decode(s: &str) -> Result<String, String> {
152    percent_encoding::percent_decode(s.as_bytes())
153        .decode_utf8()
154        .map(|s| s.to_string())
155        .map_err(|e| e.to_string())
156}
157
158fn percent_encode<'a>(s: &'a str) -> percent_encoding::PercentEncode<'a> {
159    percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC)
160}
161
162impl Default for AMQPUri {
163    fn default() -> Self {
164        AMQPUri {
165            scheme: Default::default(),
166            authority: Default::default(),
167            vhost: "/".to_string(),
168            query: Default::default(),
169        }
170    }
171}
172
173fn int_queryparam<T: FromStr<Err = ParseIntError>>(
174    url: &Url,
175    param: &str,
176) -> Result<Option<T>, String> {
177    url.query_pairs()
178        .find(|(key, _)| key == param)
179        .map_or(Ok(None), |(_, ref value)| value.parse::<T>().map(Some))
180        .map_err(|e: ParseIntError| e.to_string())
181}
182
183impl FromStr for AMQPUri {
184    type Err = String;
185
186    fn from_str(s: &str) -> Result<Self, Self::Err> {
187        let url = Url::parse(s).map_err(|e| e.to_string())?;
188        if url.cannot_be_a_base() {
189            return Err(format!("Invalid URL: '{s}'"));
190        }
191        let default = AMQPUri::default();
192        let scheme = url.scheme().parse::<AMQPScheme>()?;
193        let username = match url.username() {
194            "" => default.authority.userinfo.username,
195            username => percent_decode(username)?,
196        };
197        let password = url
198            .password()
199            .map_or(Ok(default.authority.userinfo.password), percent_decode)?;
200        let host = url
201            .domain()
202            .map_or(Ok(default.authority.host), percent_decode)?;
203        let port = url.port().unwrap_or_else(|| scheme.default_port());
204        let vhost = percent_decode(url.path().get(1..).unwrap_or("/"))?;
205        let frame_max = int_queryparam(&url, "frame_max")?;
206        let channel_max = int_queryparam(&url, "channel_max")?;
207        let heartbeat = int_queryparam(&url, "heartbeat")?;
208        let connection_timeout = int_queryparam(&url, "connection_timeout")?;
209        let auth_mechanism = url
210            .query_pairs()
211            .find(|(key, _)| key == "auth_mechanism")
212            .map_or(Ok(None), |(_, ref value)| value.parse().map(Some))?;
213
214        Ok(AMQPUri {
215            scheme,
216            authority: AMQPAuthority {
217                userinfo: AMQPUserInfo { username, password },
218                host,
219                port,
220            },
221            vhost,
222            query: AMQPQueryString {
223                frame_max,
224                channel_max,
225                heartbeat,
226                connection_timeout,
227                auth_mechanism,
228            },
229        })
230    }
231}
232
233impl fmt::Display for AMQPUri {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        write!(
236            f,
237            "{}://{}:{}@{}:{}/{}",
238            self.scheme,
239            percent_encode(&self.authority.userinfo.username),
240            percent_encode(&self.authority.userinfo.password),
241            self.authority.host,
242            self.authority.port,
243            percent_encode(&self.vhost),
244        )?;
245        let mut sep = '?';
246        if let Some(v) = self.query.frame_max {
247            write!(f, "{sep}frame_max={v}")?;
248            sep = '&';
249        }
250        if let Some(v) = self.query.channel_max {
251            write!(f, "{sep}channel_max={v}")?;
252            sep = '&';
253        }
254        if let Some(v) = self.query.heartbeat {
255            write!(f, "{sep}heartbeat={v}")?;
256            sep = '&';
257        }
258        if let Some(v) = self.query.connection_timeout {
259            write!(f, "{sep}connection_timeout={v}")?;
260            sep = '&';
261        }
262        if let Some(v) = self.query.auth_mechanism {
263            write!(f, "{sep}auth_mechanism={v}")?;
264        }
265        Ok(())
266    }
267}
268
269impl AMQPScheme {
270    /// The default port for this scheme
271    pub fn default_port(&self) -> u16 {
272        match *self {
273            AMQPScheme::AMQP => 5672,
274            AMQPScheme::AMQPS => 5671,
275        }
276    }
277}
278
279impl Default for AMQPAuthority {
280    fn default() -> Self {
281        AMQPAuthority {
282            userinfo: Default::default(),
283            host: "localhost".to_string(),
284            port: AMQPScheme::default().default_port(),
285        }
286    }
287}
288
289impl Default for AMQPUserInfo {
290    fn default() -> Self {
291        AMQPUserInfo {
292            username: "guest".to_string(),
293            password: "guest".to_string(),
294        }
295    }
296}
297
298#[cfg(test)]
299mod test {
300    use super::*;
301
302    #[test]
303    fn test_parse_amqp_no_path() {
304        let uri = "amqp://localhost".parse();
305        assert_eq!(uri, Ok(AMQPUri::default()));
306    }
307
308    #[test]
309    fn test_parse_amqp() {
310        let uri = "amqp://localhost/%2f".parse();
311        assert_eq!(uri, Ok(AMQPUri::default()));
312    }
313
314    #[test]
315    fn test_parse_amqps() {
316        let uri = "amqps://localhost/".parse();
317        assert_eq!(
318            uri,
319            Ok(AMQPUri {
320                scheme: AMQPScheme::AMQPS,
321                authority: AMQPAuthority {
322                    port: 5671,
323                    ..Default::default()
324                },
325                vhost: "".to_string(),
326                ..Default::default()
327            })
328        );
329    }
330
331    #[test]
332    fn test_parse_amqps_with_creds() {
333        let uri = "amqps://user:pass@hostname/v?foo=bar".parse();
334        assert_eq!(
335            uri,
336            Ok(AMQPUri {
337                scheme: AMQPScheme::AMQPS,
338                authority: AMQPAuthority {
339                    userinfo: AMQPUserInfo {
340                        username: "user".to_string(),
341                        password: "pass".to_string(),
342                    },
343                    host: "hostname".to_string(),
344                    port: 5671,
345                },
346                vhost: "v".to_string(),
347                ..Default::default()
348            })
349        );
350    }
351
352    #[test]
353    fn test_parse_amqps_with_creds_percent() {
354        let uri = "amqp://user%61:%61pass@ho%61st:10000/v%2fhost".parse();
355        assert_eq!(
356            uri,
357            Ok(AMQPUri {
358                scheme: AMQPScheme::AMQP,
359                authority: AMQPAuthority {
360                    userinfo: AMQPUserInfo {
361                        username: "usera".to_string(),
362                        password: "apass".to_string(),
363                    },
364                    host: "hoast".to_string(),
365                    port: 10000,
366                },
367                vhost: "v/host".to_string(),
368                ..Default::default()
369            })
370        );
371    }
372
373    #[test]
374    fn test_parse_with_heartbeat_frame_max() {
375        let uri = "amqp://localhost/%2f?heartbeat=42&frame_max=64&connection_timeout=30000".parse();
376        assert_eq!(
377            uri,
378            Ok(AMQPUri {
379                query: AMQPQueryString {
380                    frame_max: Some(64),
381                    heartbeat: Some(42),
382                    connection_timeout: Some(30000),
383                    ..Default::default()
384                },
385                ..Default::default()
386            })
387        );
388    }
389
390    #[test]
391    fn test_url_with_no_base() {
392        let uri: Result<AMQPUri, String> = "foo".parse();
393        assert_eq!(uri, Err("relative URL without a base".to_string()));
394    }
395
396    #[test]
397    fn test_invalid_url() {
398        let uri: Result<AMQPUri, String> = "foo:bar".parse();
399        assert_eq!(uri, Err("Invalid URL: 'foo:bar'".to_string()));
400    }
401
402    #[test]
403    fn test_invalid_scheme() {
404        let uri: Result<AMQPUri, String> = "http://localhost/".parse();
405        assert_eq!(uri, Err("Invalid AMQP scheme: http".to_string()));
406    }
407}