amq_protocol_uri/
lib.rs

1#![deny(missing_docs)]
2
3//! # AMQP URI manipulation library
4//!
5//! amq-protocol-uri is a library aiming at providing tools to help
6//! managing AMQP URIs
7
8use amq_protocol_types::{ChannelId, FrameSize, Heartbeat};
9use url::Url;
10
11use std::{fmt, num::ParseIntError, str::FromStr};
12
13/// An AMQP Uri
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct AMQPUri {
16    /// The scheme used by the AMQP connection
17    pub scheme: AMQPScheme,
18    /// The connection information
19    pub authority: AMQPAuthority,
20    /// The target vhost
21    pub vhost: String,
22    /// The optional query string to pass parameters to the server
23    pub query: AMQPQueryString,
24}
25
26/// The scheme used by the AMQP connection
27#[derive(Clone, Debug, Default, PartialEq, Eq)]
28pub enum AMQPScheme {
29    /// Plain AMQP
30    #[default]
31    AMQP,
32    /// Encrypted AMQP over TLS
33    AMQPS,
34}
35
36impl FromStr for AMQPScheme {
37    type Err = String;
38
39    fn from_str(s: &str) -> Result<Self, Self::Err> {
40        match s {
41            "amqp" => Ok(AMQPScheme::AMQP),
42            "amqps" => Ok(AMQPScheme::AMQPS),
43            s => Err(format!("Invalid AMQP scheme: {s}")),
44        }
45    }
46}
47
48/// The connection information
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct AMQPAuthority {
51    /// The credentials used to connect to the server
52    pub userinfo: AMQPUserInfo,
53    /// The server's host
54    pub host: String,
55    /// The port the server listens on
56    pub port: u16,
57}
58
59/// The credentials used to connect to the server
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct AMQPUserInfo {
62    /// The username
63    pub username: String,
64    /// The password
65    pub password: String,
66}
67
68/// The optional query string to pass parameters to the server
69#[derive(Clone, Debug, Default, PartialEq, Eq)]
70pub struct AMQPQueryString {
71    /// The maximum size of an AMQP Frame
72    pub frame_max: Option<FrameSize>,
73    /// The maximum number of open channels
74    pub channel_max: Option<ChannelId>,
75    /// The maximum time between two heartbeats
76    pub heartbeat: Option<Heartbeat>,
77    /// The maximum time to wait (in milliseconds) for the connection to succeed
78    pub connection_timeout: Option<u64>,
79    /// The SASL mechanism used for authentication
80    pub auth_mechanism: Option<SASLMechanism>,
81    // Fields available in Erlang implementation for SSL settings:
82    // cacertfile, certfile, keyfile, verify, fail_if_no_peer_cert, password,
83    // server_name_indication, depth
84}
85
86/// The SASL mechanisms supported by RabbitMQ
87#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
88pub enum SASLMechanism {
89    /// This is a legacy mechanism kept for backward compatibility
90    AMQPlain,
91    /// Delegate all authentication to the transport instead of the RabbitMQ server
92    External,
93    /// Default plain login, this should be supported everywhere
94    #[default]
95    Plain,
96    /// A demo of RabbitMQ SecureOk mechanism, offers the same level of security as Plain
97    RabbitCrDemo,
98}
99
100impl fmt::Display for SASLMechanism {
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        f.write_str(match self {
103            SASLMechanism::AMQPlain => "AMQPLAIN",
104            SASLMechanism::External => "EXTERNAL",
105            SASLMechanism::Plain => "PLAIN",
106            SASLMechanism::RabbitCrDemo => "RABBIT-CR-DEMO",
107        })
108    }
109}
110
111impl FromStr for SASLMechanism {
112    type Err = String;
113
114    fn from_str(s: &str) -> Result<Self, Self::Err> {
115        match s.to_lowercase().as_str() {
116            "amqplain" => Ok(SASLMechanism::AMQPlain),
117            "external" => Ok(SASLMechanism::External),
118            "plain" => Ok(SASLMechanism::Plain),
119            "rabbit-cr-demo" => Ok(SASLMechanism::RabbitCrDemo),
120            s => Err(format!("Invalid SASL mechanism: {s}")),
121        }
122    }
123}
124
125fn percent_decode(s: &str) -> Result<String, String> {
126    percent_encoding::percent_decode(s.as_bytes())
127        .decode_utf8()
128        .map(|s| s.to_string())
129        .map_err(|e| e.to_string())
130}
131
132impl Default for AMQPUri {
133    fn default() -> Self {
134        AMQPUri {
135            scheme: Default::default(),
136            authority: Default::default(),
137            vhost: "/".to_string(),
138            query: Default::default(),
139        }
140    }
141}
142
143fn int_queryparam<T: FromStr<Err = ParseIntError>>(
144    url: &Url,
145    param: &str,
146) -> Result<Option<T>, String> {
147    url.query_pairs()
148        .find(|(key, _)| key == param)
149        .map_or(Ok(None), |(_, ref value)| value.parse::<T>().map(Some))
150        .map_err(|e: ParseIntError| e.to_string())
151}
152
153impl FromStr for AMQPUri {
154    type Err = String;
155
156    fn from_str(s: &str) -> Result<Self, Self::Err> {
157        let url = Url::parse(s).map_err(|e| e.to_string())?;
158        if url.cannot_be_a_base() {
159            return Err(format!("Invalid URL: '{s}'"));
160        }
161        let default = AMQPUri::default();
162        let scheme = url.scheme().parse::<AMQPScheme>()?;
163        let username = match url.username() {
164            "" => default.authority.userinfo.username,
165            username => percent_decode(username)?,
166        };
167        let password = url
168            .password()
169            .map_or(Ok(default.authority.userinfo.password), percent_decode)?;
170        let host = url
171            .domain()
172            .map_or(Ok(default.authority.host), percent_decode)?;
173        let port = url.port().unwrap_or_else(|| scheme.default_port());
174        let vhost = percent_decode(url.path().get(1..).unwrap_or("/"))?;
175        let frame_max = int_queryparam(&url, "frame_max")?;
176        let channel_max = int_queryparam(&url, "channel_max")?;
177        let heartbeat = int_queryparam(&url, "heartbeat")?;
178        let connection_timeout = int_queryparam(&url, "connection_timeout")?;
179        let auth_mechanism = url
180            .query_pairs()
181            .find(|(key, _)| key == "auth_mechanism")
182            .map_or(Ok(None), |(_, ref value)| value.parse().map(Some))?;
183
184        Ok(AMQPUri {
185            scheme,
186            authority: AMQPAuthority {
187                userinfo: AMQPUserInfo { username, password },
188                host,
189                port,
190            },
191            vhost,
192            query: AMQPQueryString {
193                frame_max,
194                channel_max,
195                heartbeat,
196                connection_timeout,
197                auth_mechanism,
198            },
199        })
200    }
201}
202
203impl AMQPScheme {
204    /// The default port for this scheme
205    pub fn default_port(&self) -> u16 {
206        match *self {
207            AMQPScheme::AMQP => 5672,
208            AMQPScheme::AMQPS => 5671,
209        }
210    }
211}
212
213impl Default for AMQPAuthority {
214    fn default() -> Self {
215        AMQPAuthority {
216            userinfo: Default::default(),
217            host: "localhost".to_string(),
218            port: AMQPScheme::default().default_port(),
219        }
220    }
221}
222
223impl Default for AMQPUserInfo {
224    fn default() -> Self {
225        AMQPUserInfo {
226            username: "guest".to_string(),
227            password: "guest".to_string(),
228        }
229    }
230}
231
232#[cfg(test)]
233mod test {
234    use super::*;
235
236    #[test]
237    fn test_parse_amqp_no_path() {
238        let uri = "amqp://localhost".parse();
239        assert_eq!(uri, Ok(AMQPUri::default()));
240    }
241
242    #[test]
243    fn test_parse_amqp() {
244        let uri = "amqp://localhost/%2f".parse();
245        assert_eq!(uri, Ok(AMQPUri::default()));
246    }
247
248    #[test]
249    fn test_parse_amqps() {
250        let uri = "amqps://localhost/".parse();
251        assert_eq!(
252            uri,
253            Ok(AMQPUri {
254                scheme: AMQPScheme::AMQPS,
255                authority: AMQPAuthority {
256                    port: 5671,
257                    ..Default::default()
258                },
259                vhost: "".to_string(),
260                ..Default::default()
261            })
262        );
263    }
264
265    #[test]
266    fn test_parse_amqps_with_creds() {
267        let uri = "amqps://user:pass@hostname/v?foo=bar".parse();
268        assert_eq!(
269            uri,
270            Ok(AMQPUri {
271                scheme: AMQPScheme::AMQPS,
272                authority: AMQPAuthority {
273                    userinfo: AMQPUserInfo {
274                        username: "user".to_string(),
275                        password: "pass".to_string(),
276                    },
277                    host: "hostname".to_string(),
278                    port: 5671,
279                },
280                vhost: "v".to_string(),
281                ..Default::default()
282            })
283        );
284    }
285
286    #[test]
287    fn test_parse_amqps_with_creds_percent() {
288        let uri = "amqp://user%61:%61pass@ho%61st:10000/v%2fhost".parse();
289        assert_eq!(
290            uri,
291            Ok(AMQPUri {
292                scheme: AMQPScheme::AMQP,
293                authority: AMQPAuthority {
294                    userinfo: AMQPUserInfo {
295                        username: "usera".to_string(),
296                        password: "apass".to_string(),
297                    },
298                    host: "hoast".to_string(),
299                    port: 10000,
300                },
301                vhost: "v/host".to_string(),
302                ..Default::default()
303            })
304        );
305    }
306
307    #[test]
308    fn test_parse_with_heartbeat_frame_max() {
309        let uri = "amqp://localhost/%2f?heartbeat=42&frame_max=64&connection_timeout=30000".parse();
310        assert_eq!(
311            uri,
312            Ok(AMQPUri {
313                query: AMQPQueryString {
314                    frame_max: Some(64),
315                    heartbeat: Some(42),
316                    connection_timeout: Some(30000),
317                    ..Default::default()
318                },
319                ..Default::default()
320            })
321        );
322    }
323
324    #[test]
325    fn test_url_with_no_base() {
326        let uri: Result<AMQPUri, String> = "foo".parse();
327        assert_eq!(uri, Err("relative URL without a base".to_string()));
328    }
329
330    #[test]
331    fn test_invalid_url() {
332        let uri: Result<AMQPUri, String> = "foo:bar".parse();
333        assert_eq!(uri, Err("Invalid URL: 'foo:bar'".to_string()));
334    }
335
336    #[test]
337    fn test_invalid_scheme() {
338        let uri: Result<AMQPUri, String> = "http://localhost/".parse();
339        assert_eq!(uri, Err("Invalid AMQP scheme: http".to_string()));
340    }
341}