amq_protocol_uri/
lib.rs

1#![deny(missing_docs)]
2
3//! # AMQP URI manipulation library
4//!
5//! amq-protocol-uri is a library aiming at providing tools to help
6//! managing AMQP URIs
7
8use amq_protocol_types::{ChannelId, FrameSize, Heartbeat};
9use url::Url;
10
11use std::{fmt, num::ParseIntError, str::FromStr};
12
13/// An AMQP Uri
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct AMQPUri {
16    /// The scheme used by the AMQP connection
17    pub scheme: AMQPScheme,
18    /// The connection information
19    pub authority: AMQPAuthority,
20    /// The target vhost
21    pub vhost: String,
22    /// The optional query string to pass parameters to the server
23    pub query: AMQPQueryString,
24}
25
26/// The scheme used by the AMQP connection
27#[derive(Clone, Debug, Default, PartialEq, Eq)]
28pub enum AMQPScheme {
29    /// Plain AMQP
30    #[default]
31    AMQP,
32    /// Encrypted AMQP over TLS
33    AMQPS,
34}
35
36impl FromStr for AMQPScheme {
37    type Err = String;
38
39    fn from_str(s: &str) -> Result<Self, Self::Err> {
40        match s {
41            "amqp" => Ok(AMQPScheme::AMQP),
42            "amqps" => Ok(AMQPScheme::AMQPS),
43            s => Err(format!("Invalid AMQP scheme: {s}")),
44        }
45    }
46}
47
48/// The connection information
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct AMQPAuthority {
51    /// The credentials used to connect to the server
52    pub userinfo: AMQPUserInfo,
53    /// The server's host
54    pub host: String,
55    /// The port the server listens on
56    pub port: u16,
57}
58
59/// The credentials used to connect to the server
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct AMQPUserInfo {
62    /// The username
63    pub username: String,
64    /// The password
65    pub password: String,
66}
67
68/// The optional query string to pass parameters to the server
69#[derive(Clone, Debug, Default, PartialEq, Eq)]
70pub struct AMQPQueryString {
71    /// The maximum size of an AMQP Frame
72    pub frame_max: Option<FrameSize>,
73    /// The maximum number of open channels
74    pub channel_max: Option<ChannelId>,
75    /// The maximum time between two heartbeats
76    pub heartbeat: Option<Heartbeat>,
77    /// The maximum time to wait (in milliseconds) for the connection to succeed
78    pub connection_timeout: Option<u64>,
79    /// The SASL mechanism used for authentication
80    pub auth_mechanism: Option<SASLMechanism>,
81    // Fields available in Erlang implementation for SSL settings:
82    // cacertfile, certfile, keyfile, verify, fail_if_no_peer_cert, password,
83    // server_name_indication, depth
84}
85
86/// The SASL mechanisms supported by RabbitMQ
87#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
88pub enum SASLMechanism {
89    /// This is a legacy mechanism kept for backward compatibility
90    AMQPlain,
91    /// Anonymous authentication if supported by the RabbitMQ server
92    Anonymous,
93    /// Delegate all authentication to the transport instead of the RabbitMQ server
94    External,
95    /// Default plain login, this should be supported everywhere
96    #[default]
97    Plain,
98    /// A demo of RabbitMQ SecureOk mechanism, offers the same level of security as Plain
99    RabbitCrDemo,
100}
101
102impl SASLMechanism {
103    /// Get the name of the SASL mechanism as str
104    pub fn name(&self) -> &'static str {
105        match self {
106            SASLMechanism::AMQPlain => "AMQPLAIN",
107            SASLMechanism::Anonymous => "ANONYMOUS",
108            SASLMechanism::External => "EXTERNAL",
109            SASLMechanism::Plain => "PLAIN",
110            SASLMechanism::RabbitCrDemo => "RABBIT-CR-DEMO",
111        }
112    }
113}
114
115impl fmt::Display for SASLMechanism {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.write_str(self.name())
118    }
119}
120
121impl FromStr for SASLMechanism {
122    type Err = String;
123
124    fn from_str(s: &str) -> Result<Self, Self::Err> {
125        match s.to_lowercase().as_str() {
126            "amqplain" => Ok(SASLMechanism::AMQPlain),
127            "anonymous" => Ok(SASLMechanism::Anonymous),
128            "external" => Ok(SASLMechanism::External),
129            "plain" => Ok(SASLMechanism::Plain),
130            "rabbit-cr-demo" => Ok(SASLMechanism::RabbitCrDemo),
131            s => Err(format!("Invalid SASL mechanism: {s}")),
132        }
133    }
134}
135
136fn percent_decode(s: &str) -> Result<String, String> {
137    percent_encoding::percent_decode(s.as_bytes())
138        .decode_utf8()
139        .map(|s| s.to_string())
140        .map_err(|e| e.to_string())
141}
142
143impl Default for AMQPUri {
144    fn default() -> Self {
145        AMQPUri {
146            scheme: Default::default(),
147            authority: Default::default(),
148            vhost: "/".to_string(),
149            query: Default::default(),
150        }
151    }
152}
153
154fn int_queryparam<T: FromStr<Err = ParseIntError>>(
155    url: &Url,
156    param: &str,
157) -> Result<Option<T>, String> {
158    url.query_pairs()
159        .find(|(key, _)| key == param)
160        .map_or(Ok(None), |(_, ref value)| value.parse::<T>().map(Some))
161        .map_err(|e: ParseIntError| e.to_string())
162}
163
164impl FromStr for AMQPUri {
165    type Err = String;
166
167    fn from_str(s: &str) -> Result<Self, Self::Err> {
168        let url = Url::parse(s).map_err(|e| e.to_string())?;
169        if url.cannot_be_a_base() {
170            return Err(format!("Invalid URL: '{s}'"));
171        }
172        let default = AMQPUri::default();
173        let scheme = url.scheme().parse::<AMQPScheme>()?;
174        let username = match url.username() {
175            "" => default.authority.userinfo.username,
176            username => percent_decode(username)?,
177        };
178        let password = url
179            .password()
180            .map_or(Ok(default.authority.userinfo.password), percent_decode)?;
181        let host = url
182            .domain()
183            .map_or(Ok(default.authority.host), percent_decode)?;
184        let port = url.port().unwrap_or_else(|| scheme.default_port());
185        let vhost = percent_decode(url.path().get(1..).unwrap_or("/"))?;
186        let frame_max = int_queryparam(&url, "frame_max")?;
187        let channel_max = int_queryparam(&url, "channel_max")?;
188        let heartbeat = int_queryparam(&url, "heartbeat")?;
189        let connection_timeout = int_queryparam(&url, "connection_timeout")?;
190        let auth_mechanism = url
191            .query_pairs()
192            .find(|(key, _)| key == "auth_mechanism")
193            .map_or(Ok(None), |(_, ref value)| value.parse().map(Some))?;
194
195        Ok(AMQPUri {
196            scheme,
197            authority: AMQPAuthority {
198                userinfo: AMQPUserInfo { username, password },
199                host,
200                port,
201            },
202            vhost,
203            query: AMQPQueryString {
204                frame_max,
205                channel_max,
206                heartbeat,
207                connection_timeout,
208                auth_mechanism,
209            },
210        })
211    }
212}
213
214impl AMQPScheme {
215    /// The default port for this scheme
216    pub fn default_port(&self) -> u16 {
217        match *self {
218            AMQPScheme::AMQP => 5672,
219            AMQPScheme::AMQPS => 5671,
220        }
221    }
222}
223
224impl Default for AMQPAuthority {
225    fn default() -> Self {
226        AMQPAuthority {
227            userinfo: Default::default(),
228            host: "localhost".to_string(),
229            port: AMQPScheme::default().default_port(),
230        }
231    }
232}
233
234impl Default for AMQPUserInfo {
235    fn default() -> Self {
236        AMQPUserInfo {
237            username: "guest".to_string(),
238            password: "guest".to_string(),
239        }
240    }
241}
242
243#[cfg(test)]
244mod test {
245    use super::*;
246
247    #[test]
248    fn test_parse_amqp_no_path() {
249        let uri = "amqp://localhost".parse();
250        assert_eq!(uri, Ok(AMQPUri::default()));
251    }
252
253    #[test]
254    fn test_parse_amqp() {
255        let uri = "amqp://localhost/%2f".parse();
256        assert_eq!(uri, Ok(AMQPUri::default()));
257    }
258
259    #[test]
260    fn test_parse_amqps() {
261        let uri = "amqps://localhost/".parse();
262        assert_eq!(
263            uri,
264            Ok(AMQPUri {
265                scheme: AMQPScheme::AMQPS,
266                authority: AMQPAuthority {
267                    port: 5671,
268                    ..Default::default()
269                },
270                vhost: "".to_string(),
271                ..Default::default()
272            })
273        );
274    }
275
276    #[test]
277    fn test_parse_amqps_with_creds() {
278        let uri = "amqps://user:pass@hostname/v?foo=bar".parse();
279        assert_eq!(
280            uri,
281            Ok(AMQPUri {
282                scheme: AMQPScheme::AMQPS,
283                authority: AMQPAuthority {
284                    userinfo: AMQPUserInfo {
285                        username: "user".to_string(),
286                        password: "pass".to_string(),
287                    },
288                    host: "hostname".to_string(),
289                    port: 5671,
290                },
291                vhost: "v".to_string(),
292                ..Default::default()
293            })
294        );
295    }
296
297    #[test]
298    fn test_parse_amqps_with_creds_percent() {
299        let uri = "amqp://user%61:%61pass@ho%61st:10000/v%2fhost".parse();
300        assert_eq!(
301            uri,
302            Ok(AMQPUri {
303                scheme: AMQPScheme::AMQP,
304                authority: AMQPAuthority {
305                    userinfo: AMQPUserInfo {
306                        username: "usera".to_string(),
307                        password: "apass".to_string(),
308                    },
309                    host: "hoast".to_string(),
310                    port: 10000,
311                },
312                vhost: "v/host".to_string(),
313                ..Default::default()
314            })
315        );
316    }
317
318    #[test]
319    fn test_parse_with_heartbeat_frame_max() {
320        let uri = "amqp://localhost/%2f?heartbeat=42&frame_max=64&connection_timeout=30000".parse();
321        assert_eq!(
322            uri,
323            Ok(AMQPUri {
324                query: AMQPQueryString {
325                    frame_max: Some(64),
326                    heartbeat: Some(42),
327                    connection_timeout: Some(30000),
328                    ..Default::default()
329                },
330                ..Default::default()
331            })
332        );
333    }
334
335    #[test]
336    fn test_url_with_no_base() {
337        let uri: Result<AMQPUri, String> = "foo".parse();
338        assert_eq!(uri, Err("relative URL without a base".to_string()));
339    }
340
341    #[test]
342    fn test_invalid_url() {
343        let uri: Result<AMQPUri, String> = "foo:bar".parse();
344        assert_eq!(uri, Err("Invalid URL: 'foo:bar'".to_string()));
345    }
346
347    #[test]
348    fn test_invalid_scheme() {
349        let uri: Result<AMQPUri, String> = "http://localhost/".parse();
350        assert_eq!(uri, Err("Invalid AMQP scheme: http".to_string()));
351    }
352}