1#![deny(missing_docs)]
2#![allow(clippy::result_large_err)]
3
4use amq_protocol_uri::{AMQPScheme, AMQPUri};
10use async_trait::async_trait;
11use cfg_if::cfg_if;
12use executor_trait::BlockingExecutor;
13use reactor_trait::TcpReactor;
14use std::{io, ops::Deref, time::Duration};
15use tracing::trace;
16
17pub use tcp_stream::{
19 AsyncTcpStream, HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream,
20 OwnedIdentity, OwnedTLSConfig, TLSConfig, TcpStream,
21};
22
23#[cfg(feature = "native-tls")]
24pub use tcp_stream::NativeTlsConnector;
25
26#[cfg(feature = "openssl")]
27pub use tcp_stream::OpenSslConnector;
28
29#[cfg(feature = "rustls-common")]
30pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};
31
32#[async_trait]
34pub trait AMQPUriTcpExt {
35 fn connect(&self) -> HandshakeResult
37 where
38 Self: Sized,
39 {
40 self.connect_with_config(TLSConfig::default())
41 }
42
43 fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
45
46 async fn connect_async<R: Deref + Send + Sync, E: Deref + Send + Sync>(
48 &self,
49 reactor: R,
50 executor: E,
51 ) -> io::Result<AsyncTcpStream>
52 where
53 Self: Sized,
54 R::Target: TcpReactor + Send + Sync,
55 E::Target: BlockingExecutor + Send + Sync,
56 {
57 self.connect_with_config_async(TLSConfig::default(), reactor, executor)
58 .await
59 }
60
61 async fn connect_with_config_async<R: Deref + Send + Sync, E: Deref + Send + Sync>(
63 &self,
64 config: TLSConfig<'_, '_, '_>,
65 reactor: R,
66 executor: E,
67 ) -> io::Result<AsyncTcpStream>
68 where
69 R::Target: TcpReactor + Send + Sync,
70 E::Target: BlockingExecutor + Send + Sync;
71}
72
73#[async_trait]
74impl AMQPUriTcpExt for AMQPUri {
75 fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
76 cfg_if! {
77 if #[cfg(feature = "hickory-dns")] {
78 use hickory_to_socket_addrs::HickoryToSocketAddrs;
79
80 let uri = HickoryToSocketAddrs::new(&self.authority.host, self.authority.port);
81 } else {
82 let uri = (self.authority.host.as_str(), self.authority.port);
83 }
84 }
85 trace!(uri = ?uri, "Connecting.");
86 let stream = if let Some(timeout) = self.query.connection_timeout {
87 TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
88 } else {
89 TcpStream::connect(uri)
90 }?;
91 let stream = match self.scheme {
92 AMQPScheme::AMQP => stream,
93 AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config)?,
94 };
95 stream.set_nonblocking(true)?;
96 Ok(stream)
97 }
98
99 async fn connect_with_config_async<R: Deref + Send + Sync, E: Deref + Send + Sync>(
100 &self,
101 config: TLSConfig<'_, '_, '_>,
102 reactor: R,
103 executor: E,
104 ) -> io::Result<AsyncTcpStream>
105 where
106 R::Target: TcpReactor + Send + Sync,
107 E::Target: BlockingExecutor + Send + Sync,
108 {
109 cfg_if! {
110 if #[cfg(feature = "hickory-dns")] {
111 use hickory_to_socket_addrs::HickoryToSocketAddrs;
112
113 let uri = HickoryToSocketAddrs::new(self.authority.host.to_owned(), self.authority.port);
114 trace!(uri = ?uri, "Connecting.");
115 drop(executor);
116 } else {
117 let uri = (self.authority.host.to_owned(), self.authority.port);
118 trace!(uri = ?uri, "Connecting.");
119 let uri = (executor, uri);
120 }
121 }
122 let stream = AsyncTcpStream::connect(reactor, uri).await?;
123 let stream = match self.scheme {
124 AMQPScheme::AMQP => stream,
125 AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config).await?,
126 };
127 Ok(stream)
128 }
129}