lapin_futures_tls_internal/
lib.rs

1#![deny(missing_docs)]
2#![warn(rust_2018_idioms)]
3#![doc(html_root_url = "https://docs.rs/lapin-futures-tls-internal/0.7.1/")]
4#![recursion_limit="128"]
5
6//! lapin-futures-openssl
7//!
8//! This library offers a nice integration of `openssl` with the `lapin-futures` library.
9//! It uses `amq-protocol` URI parsing feature and adds the `connect` and `connect_cancellable`
10//! methods to `AMQPUri` which will provide you with a `lapin_futures::client::Client` and
11//! optionally a `lapin_futures::client::HeartbeatHandle` wrapped in a `Future`.
12//!
13//! It autodetects whether you're using `amqp` or `amqps` and opens either a raw `TcpStream`
14//! or a `TlsStream`.
15//!
16//! ## Connecting and opening a channel
17//!
18//! ```rust,no_run
19//! use env_logger;
20//! use failure::Error;
21//! use futures::{self, future::Future};
22//! use lapin_futures_tls_internal::{AMQPConnectionTlsExt, lapin};
23//! use lapin::channel::ConfirmSelectOptions;
24//! use native_tls;
25//! use tokio;
26//! use tokio_tls::TlsConnector;
27//!
28//! use std::io;
29//!
30//! fn main() {
31//!     env_logger::init();
32//!
33//!     tokio::run(
34//!         "amqps://user:pass@host/vhost?heartbeat=10".connect_cancellable(|err| {
35//!             eprintln!("heartbeat error: {:?}", err);
36//!         }, |host, stream| {
37//!             Box::new(futures::future::result(native_tls::TlsConnector::builder().build().map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to create connector"))).and_then(move |connector| {
38//!                 TlsConnector::from(connector).connect(&host, stream).map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to connect")).map(Box::new)
39//!             }))
40//!         }).map_err(Error::from).and_then(|(client, heartbeat_handle)| {
41//!             println!("Connected!");
42//!             client.create_confirm_channel(ConfirmSelectOptions::default()).map(|channel| (channel, heartbeat_handle)).and_then(|(channel, heartbeat_handle)| {
43//!                 println!("Stopping heartbeat.");
44//!                 heartbeat_handle.stop();
45//!                 println!("Closing channel.");
46//!                 channel.close(200, "Bye")
47//!             }).map_err(Error::from)
48//!         }).map_err(|err| {
49//!             eprintln!("amqp error: {:?}", err);
50//!         })
51//!     );
52//! }
53//! ```
54
55/// The type errors that can be returned in this crate.
56#[deprecated(note = "use lapin directly instead")]
57pub mod error;
58/// Reexport of the `lapin_futures` crate
59#[deprecated(note = "use lapin directly instead")]
60pub mod lapin;
61/// Reexport of the `uri` module from the `amq_protocol` crate
62#[deprecated(note = "use lapin directly instead")]
63pub mod uri;
64
65/// Reexport of `TcpStream`
66#[deprecated(note = "use lapin directly instead")]
67pub use tokio_tcp::TcpStream;
68
69use bytes::{Buf, BufMut};
70use failure;
71use futures::{self, future::Future, Poll};
72use tokio_executor;
73use tokio_io::{AsyncRead, AsyncWrite};
74use trust_dns_resolver::AsyncResolver;
75
76use std::io::{self, Read, Write};
77use std::net::SocketAddr;
78
79use error::{Error, ErrorKind};
80use lapin::client::{ConnectionOptions, ConnectionProperties};
81use uri::{AMQPScheme, AMQPUri};
82
83/// Represents either a raw `TcpStream` or a `TlsStream`.
84/// The `TlsStream` is wrapped in a `Box` to keep the enum footprint minimal.
85#[deprecated(note = "use lapin directly instead")]
86pub enum AMQPStream<TlsStream: AsyncRead + AsyncWrite + Send + 'static> {
87    /// The raw `TcpStream` used for basic AMQP connections.
88    Raw(TcpStream),
89    /// The `TlsStream` used for AMQPs connections.
90    Tls(Box<TlsStream>),
91}
92
93/// Add a connect method providing a `lapin_futures::client::Client` wrapped in a `Future`.
94#[deprecated(note = "use lapin directly instead")]
95pub trait AMQPConnectionTlsExt<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> {
96    /// Method providing a `lapin_futures::client::Client`, a `lapin_futures::client::HeartbeatHandle` and a `lapin::client::Heartbeat` pulse wrapped in a `Future`
97    fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static>;
98    /// Method providing a `lapin_futures::client::Client` and `lapin_futures::client::HeartbeatHandle` wrapped in a `Future`
99    fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
100    /// Method providing a `lapin_futures::client::Client`, a `lapin_futures::client::HeartbeatHandle` and a `lapin::client::Heartbeat` pulse wrapped in a `Future`
101    fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static>;
102    /// Method providing a `lapin_futures::client::Client` and `lapin_futures::client::HeartbeatHandle` wrapped in a `Future`
103    fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
104}
105
106impl<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> AMQPConnectionTlsExt<TlsStream> for AMQPUri {
107    fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
108        self.connect_full(connector, ConnectionProperties::default())
109    }
110
111    fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
112        self.connect_cancellable_full(heartbeat_error_handler, connector, ConnectionProperties::default())
113    }
114
115    fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
116        Box::new(AMQPStream::from_amqp_uri(&self, connector).and_then(move |stream| lapin::client::Client::connect(stream, ConnectionOptions::from_uri(self, properties)).map(|(client, mut heartbeat)| (client, heartbeat.handle().unwrap(), Box::new(heartbeat.map_err(|e| ErrorKind::ProtocolError(e).into())) as Box<dyn Future<Item = (), Error = Error> + Send + 'static>)).map_err(|e| ErrorKind::ProtocolError(e).into())))
117    }
118
119    fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
120        Box::new(self.connect_full(connector, properties).map(move |(client, heartbeat_handle, heartbeat_future)| {
121            tokio_executor::spawn(heartbeat_future.map_err(|e| heartbeat_error_handler(e)));
122            (client, heartbeat_handle)
123        }))
124    }
125}
126
127macro_rules! try_uri (
128    ($self: expr) => ({
129        match $self.parse::<AMQPUri>() {
130            Ok(uri) => uri,
131            Err(err) => return Box::new(futures::future::err(ErrorKind::UriParsingError(err).into())),
132        }
133    });
134);
135
136impl<'a, TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> AMQPConnectionTlsExt<TlsStream> for &'a str {
137    fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
138        try_uri!(self).connect(connector)
139    }
140
141    fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
142        try_uri!(self).connect_cancellable(heartbeat_error_handler, connector)
143    }
144
145    fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
146        try_uri!(self).connect_full(connector, properties)
147    }
148
149    fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
150        try_uri!(self).connect_cancellable_full(heartbeat_error_handler, connector, properties)
151    }
152}
153
154impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AMQPStream<TlsStream> {
155    fn from_amqp_uri<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(uri: &AMQPUri, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
156        match uri.scheme {
157            AMQPScheme::AMQP  => AMQPStream::raw(uri.authority.host.clone(), uri.authority.port),
158            AMQPScheme::AMQPS => AMQPStream::tls(uri.authority.host.clone(), uri.authority.port, connector),
159        }
160    }
161
162    fn raw(host: String, port: u16) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
163        Box::new(open_tcp_stream(host, port).map(AMQPStream::Raw))
164    }
165
166    fn tls<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(host: String, port: u16, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
167        Box::new(
168            open_tcp_stream(host.clone(), port).and_then(move |stream| {
169                connector(host, stream).map(AMQPStream::Tls).map_err(|e| ErrorKind::ConnectionFailed(e).into())
170            })
171        )
172    }
173}
174
175impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Read for AMQPStream<TlsStream> {
176    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
177        match *self {
178            AMQPStream::Raw(ref mut raw) => raw.read(buf),
179            AMQPStream::Tls(ref mut tls) => tls.read(buf),
180        }
181    }
182}
183
184impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncRead for AMQPStream<TlsStream> {
185    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
186        match *self {
187            AMQPStream::Raw(ref raw) => raw.prepare_uninitialized_buffer(buf),
188            AMQPStream::Tls(ref tls) => tls.prepare_uninitialized_buffer(buf),
189        }
190    }
191
192    fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
193        match *self {
194            AMQPStream::Raw(ref mut raw) => raw.read_buf(buf),
195            AMQPStream::Tls(ref mut tls) => tls.read_buf(buf),
196        }
197    }
198}
199
200impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Write for AMQPStream<TlsStream> {
201    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
202        match *self {
203            AMQPStream::Raw(ref mut raw) => raw.write(buf),
204            AMQPStream::Tls(ref mut tls) => tls.write(buf),
205        }
206    }
207
208    fn flush(&mut self) -> io::Result<()> {
209        match *self {
210            AMQPStream::Raw(ref mut raw) => raw.flush(),
211            AMQPStream::Tls(ref mut tls) => tls.flush(),
212        }
213    }
214}
215
216impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncWrite for AMQPStream<TlsStream> {
217    fn shutdown(&mut self) -> Poll<(), io::Error> {
218        match *self {
219            AMQPStream::Raw(ref mut raw) => raw.shutdown(),
220            AMQPStream::Tls(ref mut tls) => tls.shutdown(),
221        }
222    }
223
224    fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
225        match *self {
226            AMQPStream::Raw(ref mut raw) => raw.write_buf(buf),
227            AMQPStream::Tls(ref mut tls) => tls.write_buf(buf),
228        }
229    }
230}
231
232fn open_tcp_stream(host: String, port: u16) -> Box<dyn Future<Item = TcpStream, Error = Error> + Send + 'static> {
233    let host2 = host.clone();
234    Box::new(
235        futures::future::result(AsyncResolver::from_system_conf()).and_then(move |(resolver, background)| {
236            tokio_executor::spawn(background);
237            resolver.lookup_ip(host.as_str())
238        }).map_err(|e| ErrorKind::InvalidDomainName(e.to_string()).into()).and_then(|response| {
239            response.iter().next().ok_or_else(|| ErrorKind::InvalidDomainName(host2).into())
240        }).and_then(move |ipaddr| {
241            TcpStream::connect(&SocketAddr::new(ipaddr, port)).map_err(|e| ErrorKind::ConnectionFailed(e).into())
242        })
243    )
244}