amq_protocol_tcp/
lib.rs

1#![deny(missing_docs)]
2#![allow(clippy::result_large_err)]
3
4//! # AMQP URI TCP connection handling
5//!
6//! amq-protocol-tcp is a library aiming at providing tools to help
7//! connecting to an AMQP URI
8
9use amq_protocol_uri::{AMQPScheme, AMQPUri};
10use async_rs::{Runtime, traits::*};
11use cfg_if::cfg_if;
12use std::{io, time::Duration};
13use tracing::trace;
14
15/// Re-export TcpStream
16pub use tcp_stream::{
17    AsyncTcpStream, HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream,
18    OwnedIdentity, OwnedTLSConfig, TLSConfig, TcpStream,
19};
20
21#[cfg(feature = "native-tls")]
22pub use tcp_stream::NativeTlsConnector;
23
24#[cfg(feature = "openssl")]
25pub use tcp_stream::OpensslConnector;
26
27#[cfg(feature = "rustls-common")]
28pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};
29
30/// Trait providing a method to connect to a TcpStream
31pub trait AMQPUriTcpExt {
32    /// connect to a TcpStream
33    fn connect(&self) -> HandshakeResult
34    where
35        Self: Sized,
36    {
37        self.connect_with_config(TLSConfig::default())
38    }
39
40    /// connect to a TcpStream with the given configuration
41    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
42
43    /// connect to a TcpStream
44    fn connect_async<RK: RuntimeKit + Send + Sync>(
45        &self,
46        runtime: &Runtime<RK>,
47    ) -> impl Future<Output = io::Result<AsyncTcpStream<<RK as Reactor>::TcpStream>>>
48    where
49        Self: Sized,
50    {
51        self.connect_with_config_async(TLSConfig::default(), runtime)
52    }
53
54    /// connect to a TcpStream with the given configuration
55    fn connect_with_config_async<RK: RuntimeKit + Send + Sync>(
56        &self,
57        config: TLSConfig<'_, '_, '_>,
58        runtime: &Runtime<RK>,
59    ) -> impl Future<Output = io::Result<AsyncTcpStream<<RK as Reactor>::TcpStream>>>
60    where
61        Self: Sized;
62}
63
64impl AMQPUriTcpExt for AMQPUri {
65    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
66        cfg_if! {
67            if #[cfg(feature = "hickory-dns")] {
68                let uri = async_rs::HickoryToSocketAddrs::new(self.authority.host.clone(), self.authority.port);
69            } else {
70                let uri = (self.authority.host.as_str(), self.authority.port);
71            }
72        }
73        trace!(uri = ?uri, "Connecting.");
74        let stream = if let Some(timeout) = self.query.connection_timeout {
75            TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
76        } else {
77            TcpStream::connect(uri)
78        }?;
79        let stream = match self.scheme {
80            AMQPScheme::AMQP => stream,
81            AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config)?,
82        };
83        stream.set_nonblocking(true)?;
84        Ok(stream)
85    }
86
87    fn connect_with_config_async<RK: RuntimeKit + Send + Sync>(
88        &self,
89        config: TLSConfig<'_, '_, '_>,
90        runtime: &Runtime<RK>,
91    ) -> impl Future<Output = io::Result<AsyncTcpStream<<RK as Reactor>::TcpStream>>>
92    where
93        Self: Sized,
94    {
95        cfg_if! {
96            if #[cfg(feature = "hickory-dns")] {
97                let uri = async_rs::HickoryToSocketAddrs::new(self.authority.host.clone(), self.authority.port);
98            } else {
99                let uri = runtime.to_socket_addrs((self.authority.host.clone(), self.authority.port));
100            }
101        }
102        trace!(uri = ?uri, "Connecting.");
103        async move {
104            let stream = AsyncTcpStream::connect(runtime, uri).await?;
105            let stream = match self.scheme {
106                AMQPScheme::AMQP => stream,
107                AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config).await?,
108            };
109            Ok(stream)
110        }
111    }
112}