1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#![deny(missing_docs)]
#![warn(rust_2018_idioms)]
#![doc(html_root_url = "https://docs.rs/amq-protocol-tcp/6.0.0-rc9/")]

//! # AMQP URI TCP connection handling
//!
//! amq-protocol-tcp is a library aiming at providing tools to help
//! connecting to an AMQP URI

use amq_protocol_uri::{AMQPScheme, AMQPUri};
use log::trace;
use std::{
    mem::ManuallyDrop,
    ops::{Deref, DerefMut},
    time::Duration,
};

/// Re-export TcpStream
pub use tcp_stream::{
    HandshakeError, HandshakeResult, Identity, MidHandshakeTlsStream, TLSConfig, TcpStream,
};

#[cfg(feature = "native-tls")]
pub use tcp_stream::NativeTlsConnector;

#[cfg(feature = "openssl")]
pub use tcp_stream::OpenSslConnector;

#[cfg(feature = "rustls-connector")]
pub use tcp_stream::{RustlsConnector, RustlsConnectorConfig};

/// Trait providing a method to connect to a TcpStream
pub trait AMQPUriTcpExt {
    /// connect to a TcpStream
    fn connect(&self) -> HandshakeResult
    where
        Self: Sized,
    {
        self.connect_with_config(TLSConfig::default())
    }

    /// connect to a TcpStream with the given configuration
    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult;
}

impl AMQPUriTcpExt for AMQPUri {
    fn connect_with_config(&self, config: TLSConfig<'_, '_, '_>) -> HandshakeResult {
        let uri = format!("{}:{}", self.authority.host, self.authority.port);
        trace!("Connecting to {}", uri);
        let stream = if let Some(timeout) = self.query.connection_timeout {
            TcpStream::connect_timeout(uri, Duration::from_millis(timeout))
        } else {
            TcpStream::connect(uri)
        }?;

        match self.scheme {
            AMQPScheme::AMQP => Ok(stream),
            AMQPScheme::AMQPS => stream.into_tls(&self.authority.host, config),
        }
    }
}

/// Unsafe wrapper "Cloning" the TcpStream but not closing it on drop.
pub struct TcpStreamWrapper(ManuallyDrop<TcpStream>);

impl Deref for TcpStreamWrapper {
    type Target = TcpStream;

    fn deref(&self) -> &Self::Target {
        &*self.0
    }
}

impl DerefMut for TcpStreamWrapper {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut *self.0
    }
}

#[cfg(unix)]
mod sys {
    use crate::{TcpStream, TcpStreamWrapper};
    use std::{
        mem::ManuallyDrop,
        os::unix::io::{AsRawFd, FromRawFd, RawFd},
    };

    impl TcpStreamWrapper {
        /// Clone the TcpStream. Original one needs to last at least for the same lifetime.
        ///
        /// # Safety
        ///
        /// The inner TcpStream won't be closed on drop and the original one needs to live longer
        pub unsafe fn new(socket: &TcpStream) -> Self {
            Self(ManuallyDrop::new(TcpStream::from_raw_fd(
                socket.as_raw_fd(),
            )))
        }
    }

    impl AsRawFd for TcpStreamWrapper {
        fn as_raw_fd(&self) -> RawFd {
            self.0.as_raw_fd()
        }
    }
}

#[cfg(windows)]
mod sys {
    use crate::{TcpStream, TcpStreamWrapper};
    use std::{
        mem::ManuallyDrop,
        os::windows::io::{AsRawSocket, FromRawSocket, RawSocket},
    };

    impl TcpStreamWrapper {
        /// Clone the TcpStream. Original one needs to last at least for the same lifetime.
        pub unsafe fn new(socket: &TcpStream) -> Self {
            Self(ManuallyDrop::new(TcpStream::from_raw_socket(
                socket.as_raw_socket(),
            )))
        }
    }

    impl AsRawSocket for TcpStreamWrapper {
        fn as_raw_socket(&self) -> RawSocket {
            self.0.as_raw_socket()
        }
    }
}