amq_protocol_tcp/
lib.rs

1#![deny(missing_docs)]
2
3//! # AMQP URI TCP connection handling
4//!
5//! amq-protocol-tcp is a library aiming at providing tools to help
6//! connecting to an AMQP URI
7
8use amq_protocol_uri::{AMQPScheme, AMQPUri};
9use std::time::Duration;
10use tracing::trace;
11
12/// Re-export TcpStream
13pub use tcp_stream::{
14    HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream, OwnedIdentity,
15    OwnedTLSConfig, TLSConfig, TcpStream,
16};
17
18#[cfg(feature = "native-tls")]
19pub use tcp_stream::NativeTlsConnector;
20
21#[cfg(feature = "openssl")]
22pub use tcp_stream::OpenSslConnector;
23
24#[cfg(feature = "rustls-connector")]
25pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};
26
27/// Trait providing a method to connect to a TcpStream
28pub trait AMQPUriTcpExt {
29    /// connect to a TcpStream
30    fn connect(&self) -> HandshakeResult
31    where
32        Self: Sized,
33    {
34        self.connect_with_config(TLSConfig::default())
35    }
36
37    /// connect to a TcpStream with the given configuration
38    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
39}
40
41impl AMQPUriTcpExt for AMQPUri {
42    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
43        let uri = format!("{}:{}", self.authority.host, self.authority.port);
44        trace!(uri = %uri, "Connecting.");
45        let stream = if let Some(timeout) = self.query.connection_timeout {
46            TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
47        } else {
48            TcpStream::connect(uri)
49        }?;
50        let stream = match self.scheme {
51            AMQPScheme::AMQP => stream,
52            AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config)?,
53        };
54        stream.set_nonblocking(true)?;
55        Ok(stream)
56    }
57}