1#![deny(missing_docs)]
2
3use amq_protocol_uri::{AMQPScheme, AMQPUri};
9use std::time::Duration;
10use tracing::trace;
11
12pub use tcp_stream::{
14 HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream, OwnedIdentity,
15 OwnedTLSConfig, TLSConfig, TcpStream,
16};
17
18#[cfg(feature = "native-tls")]
19pub use tcp_stream::NativeTlsConnector;
20
21#[cfg(feature = "openssl")]
22pub use tcp_stream::OpenSslConnector;
23
24#[cfg(feature = "rustls-connector")]
25pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};
26
27pub trait AMQPUriTcpExt {
29 fn connect(&self) -> HandshakeResult
31 where
32 Self: Sized,
33 {
34 self.connect_with_config(TLSConfig::default())
35 }
36
37 fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
39}
40
41impl AMQPUriTcpExt for AMQPUri {
42 fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
43 let uri = format!("{}:{}", self.authority.host, self.authority.port);
44 trace!(uri = %uri, "Connecting.");
45 let stream = if let Some(timeout) = self.query.connection_timeout {
46 TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
47 } else {
48 TcpStream::connect(uri)
49 }?;
50 let stream = match self.scheme {
51 AMQPScheme::AMQP => stream,
52 AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config)?,
53 };
54 stream.set_nonblocking(true)?;
55 Ok(stream)
56 }
57}