1#![deny(missing_docs)]
2#![warn(rust_2018_idioms)]
3#![doc(html_root_url = "https://docs.rs/lapin-futures-tls-internal/0.7.1/")]
4#![recursion_limit="128"]
5
6#[deprecated(note = "use lapin directly instead")]
57pub mod error;
58#[deprecated(note = "use lapin directly instead")]
60pub mod lapin;
61#[deprecated(note = "use lapin directly instead")]
63pub mod uri;
64
65#[deprecated(note = "use lapin directly instead")]
67pub use tokio_tcp::TcpStream;
68
69use bytes::{Buf, BufMut};
70use failure;
71use futures::{self, future::Future, Poll};
72use tokio_executor;
73use tokio_io::{AsyncRead, AsyncWrite};
74use trust_dns_resolver::AsyncResolver;
75
76use std::io::{self, Read, Write};
77use std::net::SocketAddr;
78
79use error::{Error, ErrorKind};
80use lapin::client::{ConnectionOptions, ConnectionProperties};
81use uri::{AMQPScheme, AMQPUri};
82
83#[deprecated(note = "use lapin directly instead")]
86pub enum AMQPStream<TlsStream: AsyncRead + AsyncWrite + Send + 'static> {
87    Raw(TcpStream),
89    Tls(Box<TlsStream>),
91}
92
93#[deprecated(note = "use lapin directly instead")]
95pub trait AMQPConnectionTlsExt<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> {
96    fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static>;
98    fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
100    fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static>;
102    fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
104}
105
106impl<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> AMQPConnectionTlsExt<TlsStream> for AMQPUri {
107    fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
108        self.connect_full(connector, ConnectionProperties::default())
109    }
110
111    fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
112        self.connect_cancellable_full(heartbeat_error_handler, connector, ConnectionProperties::default())
113    }
114
115    fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
116        Box::new(AMQPStream::from_amqp_uri(&self, connector).and_then(move |stream| lapin::client::Client::connect(stream, ConnectionOptions::from_uri(self, properties)).map(|(client, mut heartbeat)| (client, heartbeat.handle().unwrap(), Box::new(heartbeat.map_err(|e| ErrorKind::ProtocolError(e).into())) as Box<dyn Future<Item = (), Error = Error> + Send + 'static>)).map_err(|e| ErrorKind::ProtocolError(e).into())))
117    }
118
119    fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
120        Box::new(self.connect_full(connector, properties).map(move |(client, heartbeat_handle, heartbeat_future)| {
121            tokio_executor::spawn(heartbeat_future.map_err(|e| heartbeat_error_handler(e)));
122            (client, heartbeat_handle)
123        }))
124    }
125}
126
127macro_rules! try_uri (
128    ($self: expr) => ({
129        match $self.parse::<AMQPUri>() {
130            Ok(uri) => uri,
131            Err(err) => return Box::new(futures::future::err(ErrorKind::UriParsingError(err).into())),
132        }
133    });
134);
135
136impl<'a, TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> AMQPConnectionTlsExt<TlsStream> for &'a str {
137    fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
138        try_uri!(self).connect(connector)
139    }
140
141    fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
142        try_uri!(self).connect_cancellable(heartbeat_error_handler, connector)
143    }
144
145    fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
146        try_uri!(self).connect_full(connector, properties)
147    }
148
149    fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
150        try_uri!(self).connect_cancellable_full(heartbeat_error_handler, connector, properties)
151    }
152}
153
154impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AMQPStream<TlsStream> {
155    fn from_amqp_uri<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(uri: &AMQPUri, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
156        match uri.scheme {
157            AMQPScheme::AMQP  => AMQPStream::raw(uri.authority.host.clone(), uri.authority.port),
158            AMQPScheme::AMQPS => AMQPStream::tls(uri.authority.host.clone(), uri.authority.port, connector),
159        }
160    }
161
162    fn raw(host: String, port: u16) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
163        Box::new(open_tcp_stream(host, port).map(AMQPStream::Raw))
164    }
165
166    fn tls<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(host: String, port: u16, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
167        Box::new(
168            open_tcp_stream(host.clone(), port).and_then(move |stream| {
169                connector(host, stream).map(AMQPStream::Tls).map_err(|e| ErrorKind::ConnectionFailed(e).into())
170            })
171        )
172    }
173}
174
175impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Read for AMQPStream<TlsStream> {
176    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
177        match *self {
178            AMQPStream::Raw(ref mut raw) => raw.read(buf),
179            AMQPStream::Tls(ref mut tls) => tls.read(buf),
180        }
181    }
182}
183
184impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncRead for AMQPStream<TlsStream> {
185    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
186        match *self {
187            AMQPStream::Raw(ref raw) => raw.prepare_uninitialized_buffer(buf),
188            AMQPStream::Tls(ref tls) => tls.prepare_uninitialized_buffer(buf),
189        }
190    }
191
192    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
193        match *self {
194            AMQPStream::Raw(ref mut raw) => raw.read_buf(buf),
195            AMQPStream::Tls(ref mut tls) => tls.read_buf(buf),
196        }
197    }
198}
199
200impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Write for AMQPStream<TlsStream> {
201    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
202        match *self {
203            AMQPStream::Raw(ref mut raw) => raw.write(buf),
204            AMQPStream::Tls(ref mut tls) => tls.write(buf),
205        }
206    }
207
208    fn flush(&mut self) -> io::Result<()> {
209        match *self {
210            AMQPStream::Raw(ref mut raw) => raw.flush(),
211            AMQPStream::Tls(ref mut tls) => tls.flush(),
212        }
213    }
214}
215
216impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncWrite for AMQPStream<TlsStream> {
217    fn shutdown(&mut self) -> Poll<(), io::Error> {
218        match *self {
219            AMQPStream::Raw(ref mut raw) => raw.shutdown(),
220            AMQPStream::Tls(ref mut tls) => tls.shutdown(),
221        }
222    }
223
224    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
225        match *self {
226            AMQPStream::Raw(ref mut raw) => raw.write_buf(buf),
227            AMQPStream::Tls(ref mut tls) => tls.write_buf(buf),
228        }
229    }
230}
231
232fn open_tcp_stream(host: String, port: u16) -> Box<dyn Future<Item = TcpStream, Error = Error> + Send + 'static> {
233    let host2 = host.clone();
234    Box::new(
235        futures::future::result(AsyncResolver::from_system_conf()).and_then(move |(resolver, background)| {
236            tokio_executor::spawn(background);
237            resolver.lookup_ip(host.as_str())
238        }).map_err(|e| ErrorKind::InvalidDomainName(e.to_string()).into()).and_then(|response| {
239            response.iter().next().ok_or_else(|| ErrorKind::InvalidDomainName(host2).into())
240        }).and_then(move |ipaddr| {
241            TcpStream::connect(&SocketAddr::new(ipaddr, port)).map_err(|e| ErrorKind::ConnectionFailed(e).into())
242        })
243    )
244}