amq_protocol_tcp/
lib.rs

1#![deny(missing_docs)]
2#![allow(clippy::result_large_err)]
3
4//! # AMQP URI TCP connection handling
5//!
6//! amq-protocol-tcp is a library aiming at providing tools to help
7//! connecting to an AMQP URI
8
9use amq_protocol_uri::{AMQPScheme, AMQPUri};
10use async_trait::async_trait;
11use cfg_if::cfg_if;
12use executor_trait::BlockingExecutor;
13use reactor_trait::TcpReactor;
14use std::{io, ops::Deref, time::Duration};
15use tracing::trace;
16
17/// Re-export TcpStream
18pub use tcp_stream::{
19    AsyncTcpStream, HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream,
20    OwnedIdentity, OwnedTLSConfig, TLSConfig, TcpStream,
21};
22
23#[cfg(feature = "native-tls")]
24pub use tcp_stream::NativeTlsConnector;
25
26#[cfg(feature = "openssl")]
27pub use tcp_stream::OpenSslConnector;
28
29#[cfg(feature = "rustls-common")]
30pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};
31
32/// Trait providing a method to connect to a TcpStream
33#[async_trait]
34pub trait AMQPUriTcpExt {
35    /// connect to a TcpStream
36    fn connect(&self) -> HandshakeResult
37    where
38        Self: Sized,
39    {
40        self.connect_with_config(TLSConfig::default())
41    }
42
43    /// connect to a TcpStream with the given configuration
44    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
45
46    /// connect to a TcpStream
47    async fn connect_async<R: Deref + Send + Sync, E: Deref + Send + Sync>(
48        &self,
49        reactor: R,
50        executor: E,
51    ) -> io::Result<AsyncTcpStream>
52    where
53        Self: Sized,
54        R::Target: TcpReactor + Send + Sync,
55        E::Target: BlockingExecutor + Send + Sync,
56    {
57        self.connect_with_config_async(TLSConfig::default(), reactor, executor)
58            .await
59    }
60
61    /// connect to a TcpStream with the given configuration
62    async fn connect_with_config_async<R: Deref + Send + Sync, E: Deref + Send + Sync>(
63        &self,
64        config: TLSConfig<'_, '_, '_>,
65        reactor: R,
66        executor: E,
67    ) -> io::Result<AsyncTcpStream>
68    where
69        R::Target: TcpReactor + Send + Sync,
70        E::Target: BlockingExecutor + Send + Sync;
71}
72
73#[async_trait]
74impl AMQPUriTcpExt for AMQPUri {
75    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
76        cfg_if! {
77            if #[cfg(feature = "hickory-dns")] {
78                use hickory_to_socket_addrs::HickoryToSocketAddrs;
79
80                let uri = HickoryToSocketAddrs::new(&self.authority.host, self.authority.port);
81            } else {
82                let uri = (self.authority.host.as_str(), self.authority.port);
83            }
84        }
85        trace!(uri = ?uri, "Connecting.");
86        let stream = if let Some(timeout) = self.query.connection_timeout {
87            TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
88        } else {
89            TcpStream::connect(uri)
90        }?;
91        let stream = match self.scheme {
92            AMQPScheme::AMQP => stream,
93            AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config)?,
94        };
95        stream.set_nonblocking(true)?;
96        Ok(stream)
97    }
98
99    async fn connect_with_config_async<R: Deref + Send + Sync, E: Deref + Send + Sync>(
100        &self,
101        config: TLSConfig<'_, '_, '_>,
102        reactor: R,
103        executor: E,
104    ) -> io::Result<AsyncTcpStream>
105    where
106        R::Target: TcpReactor + Send + Sync,
107        E::Target: BlockingExecutor + Send + Sync,
108    {
109        cfg_if! {
110            if #[cfg(feature = "hickory-dns")] {
111                use hickory_to_socket_addrs::HickoryToSocketAddrs;
112
113                let uri = HickoryToSocketAddrs::new(self.authority.host.to_owned(), self.authority.port);
114                trace!(uri = ?uri, "Connecting.");
115                drop(executor);
116            } else {
117                let uri = (self.authority.host.to_owned(), self.authority.port);
118                trace!(uri = ?uri, "Connecting.");
119                let uri = (executor, uri);
120            }
121        }
122        let stream = AsyncTcpStream::connect(reactor, uri).await?;
123        let stream = match self.scheme {
124            AMQPScheme::AMQP => stream,
125            AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config).await?,
126        };
127        Ok(stream)
128    }
129}