#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/lapin-futures-tls-internal/0.5.0/")]
#![recursion_limit="128"]
pub mod error;
pub mod lapin;
pub mod uri;
pub use tokio_tcp::TcpStream;
use bytes::{Buf, BufMut};
use failure;
use futures::{self, future::Future, Poll};
use tokio_executor;
use tokio_io::{AsyncRead, AsyncWrite};
use trust_dns_resolver::AsyncResolver;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use error::{Error, ErrorKind};
use lapin::client::ConnectionOptions;
use uri::{AMQPScheme, AMQPUri};
pub enum AMQPStream<TlsStream: AsyncRead + AsyncWrite + Send + 'static> {
Raw(TcpStream),
Tls(Box<TlsStream>),
}
pub trait AMQPConnectionTlsExt<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static, F: FnOnce(Error) + Send + 'static> {
fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = lapin::client::Client<AMQPStream<TlsStream>>, Error = Error> + Send + 'static>;
fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static, F: FnOnce(Error) + Send + 'static> AMQPConnectionTlsExt<TlsStream, F> for AMQPUri {
fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = lapin::client::Client<AMQPStream<TlsStream>>, Error = Error> + Send + 'static> {
Box::new(AMQPStream::from_amqp_uri(&self, connector).and_then(move |stream| connect_stream(stream, self, heartbeat_error_handler, false)).map(|(client, _)| client))
}
fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
Box::new(AMQPStream::from_amqp_uri(&self, connector).and_then(move |stream| connect_stream(stream, self, heartbeat_error_handler, true)).map(|(client, heartbeat_handle)| (client, heartbeat_handle.unwrap())))
}
}
impl<'a, TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static, F: FnOnce(Error) + Send + 'static> AMQPConnectionTlsExt<TlsStream, F> for &'a str {
fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = lapin::client::Client<AMQPStream<TlsStream>>, Error = Error> + Send + 'static> {
match self.parse::<AMQPUri>() {
Ok(uri) => uri.connect(heartbeat_error_handler, connector),
Err(err) => Box::new(futures::future::err(ErrorKind::UriParsingError(err).into())),
}
}
fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
match self.parse::<AMQPUri>() {
Ok(uri) => uri.connect_cancellable(heartbeat_error_handler, connector),
Err(err) => Box::new(futures::future::err(ErrorKind::UriParsingError(err).into())),
}
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AMQPStream<TlsStream> {
fn from_amqp_uri<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(uri: &AMQPUri, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
match uri.scheme {
AMQPScheme::AMQP => AMQPStream::raw(uri.authority.host.clone(), uri.authority.port),
AMQPScheme::AMQPS => AMQPStream::tls(uri.authority.host.clone(), uri.authority.port, connector),
}
}
fn raw(host: String, port: u16) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
Box::new(open_tcp_stream(host, port).map(AMQPStream::Raw))
}
fn tls<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(host: String, port: u16, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
Box::new(
open_tcp_stream(host.clone(), port).and_then(move |stream| {
connector(host, stream).map(AMQPStream::Tls).map_err(|e| ErrorKind::ConnectionFailed(e).into())
})
)
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Read for AMQPStream<TlsStream> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.read(buf),
AMQPStream::Tls(ref mut tls) => tls.read(buf),
}
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncRead for AMQPStream<TlsStream> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match *self {
AMQPStream::Raw(ref raw) => raw.prepare_uninitialized_buffer(buf),
AMQPStream::Tls(ref tls) => tls.prepare_uninitialized_buffer(buf),
}
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.read_buf(buf),
AMQPStream::Tls(ref mut tls) => tls.read_buf(buf),
}
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Write for AMQPStream<TlsStream> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.write(buf),
AMQPStream::Tls(ref mut tls) => tls.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.flush(),
AMQPStream::Tls(ref mut tls) => tls.flush(),
}
}
}
impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncWrite for AMQPStream<TlsStream> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.shutdown(),
AMQPStream::Tls(ref mut tls) => tls.shutdown(),
}
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
match *self {
AMQPStream::Raw(ref mut raw) => raw.write_buf(buf),
AMQPStream::Tls(ref mut tls) => tls.write_buf(buf),
}
}
}
fn open_tcp_stream(host: String, port: u16) -> Box<dyn Future<Item = TcpStream, Error = Error> + Send + 'static> {
let host2 = host.clone();
Box::new(
futures::future::result(AsyncResolver::from_system_conf()).and_then(move |(resolver, background)| {
tokio_executor::spawn(background);
resolver.lookup_ip(host.as_str())
}).map_err(|e| ErrorKind::InvalidDomainName(e.to_string()).into()).and_then(|response| {
response.iter().next().ok_or_else(|| ErrorKind::InvalidDomainName(host2).into())
}).and_then(move |ipaddr| {
TcpStream::connect(&SocketAddr::new(ipaddr, port)).map_err(|e| ErrorKind::ConnectionFailed(e).into())
})
)
}
fn connect_stream<T: AsyncRead + AsyncWrite + Send + Sync + 'static, F: FnOnce(Error) + Send + 'static>(stream: T, uri: AMQPUri, heartbeat_error_handler: F, create_heartbeat_handle: bool) -> Box<dyn Future<Item = (lapin::client::Client<T>, Option<lapin::client::HeartbeatHandle>), Error = Error> + Send + 'static> {
Box::new(lapin::client::Client::connect(stream, ConnectionOptions::from_uri(uri)).map(move |(client, mut heartbeat_future)| {
let heartbeat_handle = if create_heartbeat_handle { heartbeat_future.handle() } else { None };
tokio_executor::spawn(heartbeat_future.map_err(|e| heartbeat_error_handler(e.into())));
(client, heartbeat_handle)
}).map_err(|e| ErrorKind::ProtocolError(e).into()))
}