lapin_futures_tls_api/
lib.rs

1#![deny(missing_docs)]
2#![warn(rust_2018_idioms)]
3#![doc(html_root_url = "https://docs.rs/lapin-futures-tls-api/0.19.1/")]
4
5//! lapin-futures-tls-api
6//!
7//! This library offers a nice integration of `tls-api` with the `lapin-futures` library.
8//! It uses `amq-protocol` URI parsing feature and adds the `connect` and `connect_cancellable`
9//! methods to `AMQPUri` which will provide you with a `lapin_futures::client::Client` and
10//! optionally a `lapin_futures::client::HeartbeatHandle` wrapped in a `Future`.
11//!
12//! It autodetects whether you're using `amqp` or `amqps` and opens either a raw `TcpStream`
13//! or a `TlsStream` using `tls-api` as the SSL api.
14//!
15//! ## Connecting and opening a channel
16//!
17//! ```rust,no_run
18//! use env_logger;
19//! use failure::Error;
20//! use futures::future::Future;
21//! use lapin_futures_tls_api::{AMQPConnectionTlsApiExt, lapin};
22//! use lapin::channel::ConfirmSelectOptions;
23//! use tokio;
24//!
25//! fn main() {
26//!     env_logger::init();
27//!
28//!     tokio::run(
29//!         "amqps://user:pass@host/vhost?heartbeat=10".connect_cancellable::<tls_api_stub::TlsConnector, _>(|err| {
30//!             eprintln!("heartbeat error: {:?}", err);
31//!         }).map_err(Error::from).and_then(|(client, heartbeat_handle)| {
32//!             println!("Connected!");
33//!             client.create_confirm_channel(ConfirmSelectOptions::default()).map(|channel| (channel, heartbeat_handle)).and_then(|(channel, heartbeat_handle)| {
34//!                 println!("Stopping heartbeat.");
35//!                 heartbeat_handle.stop();
36//!                 println!("Closing channel.");
37//!                 channel.close(200, "Bye")
38//!             }).map_err(Error::from)
39//!         }).map_err(|err| {
40//!             eprintln!("amqp error: {:?}", err);
41//!         })
42//!     );
43//! }
44//! ```
45
46/// Reexport of the `lapin_futures_tls_internal` errors
47#[deprecated(note = "use lapin directly instead")]
48pub mod error;
49/// Reexport of the `lapin_futures` crate
50#[deprecated(note = "use lapin directly instead")]
51pub mod lapin;
52/// Reexport of the `uri` module from the `amq_protocol` crate
53#[deprecated(note = "use lapin directly instead")]
54pub mod uri;
55
56/// Reexport of `AMQPStream`
57#[deprecated(note = "use lapin directly instead")]
58pub type AMQPStream = lapin_futures_tls_internal::AMQPStream<TlsStream<TcpStream>>;
59
60use futures::{self, future::Future};
61use lapin_futures_tls_internal::{self, AMQPConnectionTlsExt, error::Error, lapin::client::ConnectionProperties, TcpStream};
62use tls_api::{TlsConnector, TlsConnectorBuilder};
63use tokio_tls_api::{self, TlsStream};
64
65use std::io;
66
67use uri::AMQPUri;
68
69fn connector<C: TlsConnector + Send + 'static>(host: String, stream: TcpStream) -> Box<dyn Future<Item = Box<TlsStream<TcpStream>>, Error = io::Error> + Send + 'static> {
70    Box::new(futures::future::result(C::builder().and_then(TlsConnectorBuilder::build).map_err(From::from)).and_then(move |connector| {
71        tokio_tls_api::connect_async(&connector, &host, stream).map_err(From::from).map(Box::new)
72    }))
73}
74
75/// Add a connect method providing a `lapin_futures::client::Client` wrapped in a `Future`.
76#[deprecated(note = "use lapin directly instead")]
77pub trait AMQPConnectionTlsApiExt: AMQPConnectionTlsExt<TlsStream<TcpStream>> where Self: Sized {
78    /// Method providing a `lapin_futures::client::Client`, a `lapin_futures::client::HeartbeatHandle` and a `lapin::client::Heartbeat` pulse wrapped in a `Future`
79    fn connect<C: TlsConnector + Send + 'static>(self) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
80        AMQPConnectionTlsExt::connect(self, connector::<C>)
81    }
82    /// Method providing a `lapin_futures::client::Client` and `lapin_futures::client::HeartbeatHandle` wrapped in a `Future`
83    fn connect_cancellable<C: TlsConnector + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
84        AMQPConnectionTlsExt::connect_cancellable(self, heartbeat_error_handler, connector::<C>)
85    }
86    /// Method providing a `lapin_futures::client::Client`, a `lapin_futures::client::HeartbeatHandle` and a `lapin::client::Heartbeat` pulse wrapped in a `Future`
87    fn connect_full<C: TlsConnector + Send + 'static>(self, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
88        AMQPConnectionTlsExt::connect_full(self, connector::<C>, properties)
89    }
90    /// Method providing a `lapin_futures::client::Client` and `lapin_futures::client::HeartbeatHandle` wrapped in a `Future`
91    fn connect_cancellable_full<C: TlsConnector + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
92        AMQPConnectionTlsExt::connect_cancellable_full(self, heartbeat_error_handler, connector::<C>, properties)
93    }
94}
95
96impl AMQPConnectionTlsApiExt for AMQPUri {}
97impl<'a> AMQPConnectionTlsApiExt for &'a str {}