#![deny(missing_docs)]
#![warn(rust_2018_idioms)]
#![doc(html_root_url = "https://docs.rs/lapin-futures-tls-internal/0.7.1/")]
#![recursion_limit="128"]
#[deprecated(note = "use lapin directly instead")]
pub mod error;
#[deprecated(note = "use lapin directly instead")]
pub mod lapin;
#[deprecated(note = "use lapin directly instead")]
pub mod uri;
#[deprecated(note = "use lapin directly instead")]
pub use tokio_tcp::TcpStream;
use bytes::{Buf, BufMut};
use failure;
use futures::{self, future::Future, Poll};
use tokio_executor;
use tokio_io::{AsyncRead, AsyncWrite};
use trust_dns_resolver::AsyncResolver;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use error::{Error, ErrorKind};
use lapin::client::{ConnectionOptions, ConnectionProperties};
use uri::{AMQPScheme, AMQPUri};
#[deprecated(note = "use lapin directly instead")]
pub enum AMQPStream<TlsStream: AsyncRead + AsyncWrite + Send + 'static> {
Raw(TcpStream),
Tls(Box<TlsStream>),
}
#[deprecated(note = "use lapin directly instead")]
pub trait AMQPConnectionTlsExt<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> {
fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static>;
fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static>;
fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> AMQPConnectionTlsExt<TlsStream> for AMQPUri {
fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
self.connect_full(connector, ConnectionProperties::default())
}
fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
self.connect_cancellable_full(heartbeat_error_handler, connector, ConnectionProperties::default())
}
fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
Box::new(AMQPStream::from_amqp_uri(&self, connector).and_then(move |stream| lapin::client::Client::connect(stream, ConnectionOptions::from_uri(self, properties)).map(|(client, mut heartbeat)| (client, heartbeat.handle().unwrap(), Box::new(heartbeat.map_err(|e| ErrorKind::ProtocolError(e).into())) as Box<dyn Future<Item = (), Error = Error> + Send + 'static>)).map_err(|e| ErrorKind::ProtocolError(e).into())))
}
fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
Box::new(self.connect_full(connector, properties).map(move |(client, heartbeat_handle, heartbeat_future)| {
tokio_executor::spawn(heartbeat_future.map_err(|e| heartbeat_error_handler(e)));
(client, heartbeat_handle)
}))
}
}
macro_rules! try_uri (
($self: expr) => ({
match $self.parse::<AMQPUri>() {
Ok(uri) => uri,
Err(err) => return Box::new(futures::future::err(ErrorKind::UriParsingError(err).into())),
}
});
);
impl<'a, TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> AMQPConnectionTlsExt<TlsStream> for &'a str {
fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
try_uri!(self).connect(connector)
}
fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
try_uri!(self).connect_cancellable(heartbeat_error_handler, connector)
}
fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
try_uri!(self).connect_full(connector, properties)
}
fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
try_uri!(self).connect_cancellable_full(heartbeat_error_handler, connector, properties)
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AMQPStream<TlsStream> {
fn from_amqp_uri<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(uri: &AMQPUri, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
match uri.scheme {
AMQPScheme::AMQP => AMQPStream::raw(uri.authority.host.clone(), uri.authority.port),
AMQPScheme::AMQPS => AMQPStream::tls(uri.authority.host.clone(), uri.authority.port, connector),
}
}
fn raw(host: String, port: u16) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
Box::new(open_tcp_stream(host, port).map(AMQPStream::Raw))
}
fn tls<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(host: String, port: u16, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
Box::new(
open_tcp_stream(host.clone(), port).and_then(move |stream| {
connector(host, stream).map(AMQPStream::Tls).map_err(|e| ErrorKind::ConnectionFailed(e).into())
})
)
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Read for AMQPStream<TlsStream> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.read(buf),
AMQPStream::Tls(ref mut tls) => tls.read(buf),
}
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncRead for AMQPStream<TlsStream> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match *self {
AMQPStream::Raw(ref raw) => raw.prepare_uninitialized_buffer(buf),
AMQPStream::Tls(ref tls) => tls.prepare_uninitialized_buffer(buf),
}
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.read_buf(buf),
AMQPStream::Tls(ref mut tls) => tls.read_buf(buf),
}
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Write for AMQPStream<TlsStream> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.write(buf),
AMQPStream::Tls(ref mut tls) => tls.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.flush(),
AMQPStream::Tls(ref mut tls) => tls.flush(),
}
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncWrite for AMQPStream<TlsStream> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.shutdown(),
AMQPStream::Tls(ref mut tls) => tls.shutdown(),
}
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.write_buf(buf),
AMQPStream::Tls(ref mut tls) => tls.write_buf(buf),
}
}
}
fn open_tcp_stream(host: String, port: u16) -> Box<dyn Future<Item = TcpStream, Error = Error> + Send + 'static> {
let host2 = host.clone();
Box::new(
futures::future::result(AsyncResolver::from_system_conf()).and_then(move |(resolver, background)| {
tokio_executor::spawn(background);
resolver.lookup_ip(host.as_str())
}).map_err(|e| ErrorKind::InvalidDomainName(e.to_string()).into()).and_then(|response| {
response.iter().next().ok_or_else(|| ErrorKind::InvalidDomainName(host2).into())
}).and_then(move |ipaddr| {
TcpStream::connect(&SocketAddr::new(ipaddr, port)).map_err(|e| ErrorKind::ConnectionFailed(e).into())
})
)
}