amq_protocol_tcp/
lib.rs

1#![deny(missing_docs)]
2#![allow(clippy::result_large_err)]
3
4//! # AMQP URI TCP connection handling
5//!
6//! amq-protocol-tcp is a library aiming at providing tools to help
7//! connecting to an AMQP URI
8
9use amq_protocol_uri::{AMQPScheme, AMQPUri};
10use async_trait::async_trait;
11use cfg_if::cfg_if;
12use executor_trait::BlockingExecutor;
13use reactor_trait::TcpReactor;
14use std::{io, ops::Deref, time::Duration};
15use tracing::trace;
16
17/// Re-export TcpStream
18pub use tcp_stream::{
19    AsyncTcpStream, HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream,
20    OwnedIdentity, OwnedTLSConfig, TLSConfig, TcpStream,
21};
22
23#[cfg(feature = "native-tls")]
24pub use tcp_stream::NativeTlsConnector;
25
26#[cfg(feature = "openssl")]
27pub use tcp_stream::OpenSslConnector;
28
29#[cfg(feature = "rustls-common")]
30pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};
31
32/// Trait providing a method to connect to a TcpStream
33#[async_trait]
34pub trait AMQPUriTcpExt {
35    /// connect to a TcpStream
36    fn connect(&self) -> HandshakeResult
37    where
38        Self: Sized,
39    {
40        self.connect_with_config(TLSConfig::default())
41    }
42
43    /// connect to a TcpStream with the given configuration
44    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
45
46    /// connect to a TcpStream
47    async fn connect_async<R: TcpReactor + Send + Sync, E: Deref + Send + Sync>(
48        &self,
49        reactor: R,
50        executor: E,
51    ) -> io::Result<AsyncTcpStream>
52    where
53        Self: Sized,
54        E::Target: BlockingExecutor + Send + Sync,
55    {
56        self.connect_with_config_async(TLSConfig::default(), reactor, executor)
57            .await
58    }
59
60    /// connect to a TcpStream with the given configuration
61    async fn connect_with_config_async<R: TcpReactor + Send + Sync, E: Deref + Send + Sync>(
62        &self,
63        config: TLSConfig<'_, '_, '_>,
64        reactor: R,
65        executor: E,
66    ) -> io::Result<AsyncTcpStream>
67    where
68        E::Target: BlockingExecutor + Send + Sync;
69}
70
71#[async_trait]
72impl AMQPUriTcpExt for AMQPUri {
73    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
74        cfg_if! {
75            if #[cfg(feature = "hickory-dns")] {
76                use hickory_to_socket_addrs::HickoryToSocketAddrs;
77
78                let uri = HickoryToSocketAddrs::new(&self.authority.host, self.authority.port);
79            } else {
80                let uri = (self.authority.host.as_str(), self.authority.port);
81            }
82        }
83        trace!(uri = ?uri, "Connecting.");
84        let stream = if let Some(timeout) = self.query.connection_timeout {
85            TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
86        } else {
87            TcpStream::connect(uri)
88        }?;
89        let stream = match self.scheme {
90            AMQPScheme::AMQP => stream,
91            AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config)?,
92        };
93        stream.set_nonblocking(true)?;
94        Ok(stream)
95    }
96
97    async fn connect_with_config_async<R: TcpReactor + Send + Sync, E: Deref + Send + Sync>(
98        &self,
99        config: TLSConfig<'_, '_, '_>,
100        reactor: R,
101        executor: E,
102    ) -> io::Result<AsyncTcpStream>
103    where
104        E::Target: BlockingExecutor + Send + Sync,
105    {
106        cfg_if! {
107            if #[cfg(feature = "hickory-dns")] {
108                use hickory_to_socket_addrs::HickoryToSocketAddrs;
109
110                let uri = HickoryToSocketAddrs::new(self.authority.host.to_owned(), self.authority.port);
111                trace!(uri = ?uri, "Connecting.");
112                drop(executor);
113            } else {
114                let uri = (self.authority.host.to_owned(), self.authority.port);
115                trace!(uri = ?uri, "Connecting.");
116                let uri = (executor, uri);
117            }
118        }
119        let stream = AsyncTcpStream::connect(reactor, uri).await?;
120        let stream = match self.scheme {
121            AMQPScheme::AMQP => stream,
122            AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config).await?,
123        };
124        Ok(stream)
125    }
126}