1#![deny(missing_docs)]
2#![allow(clippy::result_large_err)]
3
4use amq_protocol_uri::{AMQPScheme, AMQPUri};
10use async_trait::async_trait;
11use cfg_if::cfg_if;
12use executor_trait::BlockingExecutor;
13use reactor_trait::TcpReactor;
14use std::{io, ops::Deref, time::Duration};
15use tracing::trace;
16
17pub use tcp_stream::{
19 AsyncTcpStream, HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream,
20 OwnedIdentity, OwnedTLSConfig, TLSConfig, TcpStream,
21};
22
23#[cfg(feature = "native-tls")]
24pub use tcp_stream::NativeTlsConnector;
25
26#[cfg(feature = "openssl")]
27pub use tcp_stream::OpenSslConnector;
28
29#[cfg(feature = "rustls-common")]
30pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};
31
32#[async_trait]
34pub trait AMQPUriTcpExt {
35 fn connect(&self) -> HandshakeResult
37 where
38 Self: Sized,
39 {
40 self.connect_with_config(TLSConfig::default())
41 }
42
43 fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
45
46 async fn connect_async<R: TcpReactor + Send + Sync, E: Deref + Send + Sync>(
48 &self,
49 reactor: R,
50 executor: E,
51 ) -> io::Result<AsyncTcpStream>
52 where
53 Self: Sized,
54 E::Target: BlockingExecutor + Send + Sync,
55 {
56 self.connect_with_config_async(TLSConfig::default(), reactor, executor)
57 .await
58 }
59
60 async fn connect_with_config_async<R: TcpReactor + Send + Sync, E: Deref + Send + Sync>(
62 &self,
63 config: TLSConfig<'_, '_, '_>,
64 reactor: R,
65 executor: E,
66 ) -> io::Result<AsyncTcpStream>
67 where
68 E::Target: BlockingExecutor + Send + Sync;
69}
70
71#[async_trait]
72impl AMQPUriTcpExt for AMQPUri {
73 fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
74 cfg_if! {
75 if #[cfg(feature = "hickory-dns")] {
76 use hickory_to_socket_addrs::HickoryToSocketAddrs;
77
78 let uri = HickoryToSocketAddrs::new(&self.authority.host, self.authority.port);
79 } else {
80 let uri = (self.authority.host.as_str(), self.authority.port);
81 }
82 }
83 trace!(uri = ?uri, "Connecting.");
84 let stream = if let Some(timeout) = self.query.connection_timeout {
85 TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
86 } else {
87 TcpStream::connect(uri)
88 }?;
89 let stream = match self.scheme {
90 AMQPScheme::AMQP => stream,
91 AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config)?,
92 };
93 stream.set_nonblocking(true)?;
94 Ok(stream)
95 }
96
97 async fn connect_with_config_async<R: TcpReactor + Send + Sync, E: Deref + Send + Sync>(
98 &self,
99 config: TLSConfig<'_, '_, '_>,
100 reactor: R,
101 executor: E,
102 ) -> io::Result<AsyncTcpStream>
103 where
104 E::Target: BlockingExecutor + Send + Sync,
105 {
106 cfg_if! {
107 if #[cfg(feature = "hickory-dns")] {
108 use hickory_to_socket_addrs::HickoryToSocketAddrs;
109
110 let uri = HickoryToSocketAddrs::new(self.authority.host.to_owned(), self.authority.port);
111 trace!(uri = ?uri, "Connecting.");
112 drop(executor);
113 } else {
114 let uri = (self.authority.host.to_owned(), self.authority.port);
115 trace!(uri = ?uri, "Connecting.");
116 let uri = (executor, uri);
117 }
118 }
119 let stream = AsyncTcpStream::connect(reactor, uri).await?;
120 let stream = match self.scheme {
121 AMQPScheme::AMQP => stream,
122 AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config).await?,
123 };
124 Ok(stream)
125 }
126}