1#![deny(missing_docs)]
2#![allow(clippy::result_large_err)]
3
4use amq_protocol_uri::{AMQPScheme, AMQPUri};
10use async_rs::{Runtime, traits::*};
11use cfg_if::cfg_if;
12use std::{io, time::Duration};
13use tracing::trace;
14
15pub use tcp_stream::{
17 AsyncTcpStream, HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream,
18 OwnedIdentity, OwnedTLSConfig, TLSConfig, TcpStream,
19};
20
21#[cfg(feature = "native-tls")]
22pub use tcp_stream::NativeTlsConnector;
23
24#[cfg(feature = "openssl")]
25pub use tcp_stream::OpensslConnector;
26
27#[cfg(feature = "rustls-common")]
28pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};
29
30pub trait AMQPUriTcpExt {
32 fn connect(&self) -> HandshakeResult
34 where
35 Self: Sized,
36 {
37 self.connect_with_config(TLSConfig::default())
38 }
39
40 fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
42
43 fn connect_async<RK: RuntimeKit + Send + Sync>(
45 &self,
46 runtime: &Runtime<RK>,
47 ) -> impl Future<Output = io::Result<AsyncTcpStream<<RK as Reactor>::TcpStream>>>
48 where
49 Self: Sized,
50 {
51 self.connect_with_config_async(TLSConfig::default(), runtime)
52 }
53
54 fn connect_with_config_async<RK: RuntimeKit + Send + Sync>(
56 &self,
57 config: TLSConfig<'_, '_, '_>,
58 runtime: &Runtime<RK>,
59 ) -> impl Future<Output = io::Result<AsyncTcpStream<<RK as Reactor>::TcpStream>>>
60 where
61 Self: Sized;
62}
63
64impl AMQPUriTcpExt for AMQPUri {
65 fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
66 cfg_if! {
67 if #[cfg(feature = "hickory-dns")] {
68 let uri = async_rs::HickoryToSocketAddrs::new(self.authority.host.clone(), self.authority.port);
69 } else {
70 let uri = (self.authority.host.as_str(), self.authority.port);
71 }
72 }
73 trace!(uri = ?uri, "Connecting.");
74 let stream = if let Some(timeout) = self.query.connection_timeout {
75 TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
76 } else {
77 TcpStream::connect(uri)
78 }?;
79 let stream = match self.scheme {
80 AMQPScheme::AMQP => stream,
81 AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config)?,
82 };
83 stream.set_nonblocking(true)?;
84 Ok(stream)
85 }
86
87 fn connect_with_config_async<RK: RuntimeKit + Send + Sync>(
88 &self,
89 config: TLSConfig<'_, '_, '_>,
90 runtime: &Runtime<RK>,
91 ) -> impl Future<Output = io::Result<AsyncTcpStream<<RK as Reactor>::TcpStream>>>
92 where
93 Self: Sized,
94 {
95 cfg_if! {
96 if #[cfg(feature = "hickory-dns")] {
97 let uri = async_rs::HickoryToSocketAddrs::new(self.authority.host.clone(), self.authority.port);
98 } else {
99 let uri = runtime.to_socket_addrs((self.authority.host.clone(), self.authority.port));
100 }
101 }
102 trace!(uri = ?uri, "Connecting.");
103 async move {
104 let stream = AsyncTcpStream::connect(runtime, uri).await?;
105 let stream = match self.scheme {
106 AMQPScheme::AMQP => stream,
107 AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config).await?,
108 };
109 Ok(stream)
110 }
111 }
112}