1#![deny(missing_docs)]
2#![warn(rust_2018_idioms)]
3#![doc(html_root_url = "https://docs.rs/lapin-futures-tls-internal/0.7.1/")]
4#![recursion_limit="128"]
5
6#[deprecated(note = "use lapin directly instead")]
57pub mod error;
58#[deprecated(note = "use lapin directly instead")]
60pub mod lapin;
61#[deprecated(note = "use lapin directly instead")]
63pub mod uri;
64
65#[deprecated(note = "use lapin directly instead")]
67pub use tokio_tcp::TcpStream;
68
69use bytes::{Buf, BufMut};
70use failure;
71use futures::{self, future::Future, Poll};
72use tokio_executor;
73use tokio_io::{AsyncRead, AsyncWrite};
74use trust_dns_resolver::AsyncResolver;
75
76use std::io::{self, Read, Write};
77use std::net::SocketAddr;
78
79use error::{Error, ErrorKind};
80use lapin::client::{ConnectionOptions, ConnectionProperties};
81use uri::{AMQPScheme, AMQPUri};
82
83#[deprecated(note = "use lapin directly instead")]
86pub enum AMQPStream<TlsStream: AsyncRead + AsyncWrite + Send + 'static> {
87 Raw(TcpStream),
89 Tls(Box<TlsStream>),
91}
92
93#[deprecated(note = "use lapin directly instead")]
95pub trait AMQPConnectionTlsExt<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> {
96 fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static>;
98 fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
100 fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static>;
102 fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static>;
104}
105
106impl<TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> AMQPConnectionTlsExt<TlsStream> for AMQPUri {
107 fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
108 self.connect_full(connector, ConnectionProperties::default())
109 }
110
111 fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
112 self.connect_cancellable_full(heartbeat_error_handler, connector, ConnectionProperties::default())
113 }
114
115 fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
116 Box::new(AMQPStream::from_amqp_uri(&self, connector).and_then(move |stream| lapin::client::Client::connect(stream, ConnectionOptions::from_uri(self, properties)).map(|(client, mut heartbeat)| (client, heartbeat.handle().unwrap(), Box::new(heartbeat.map_err(|e| ErrorKind::ProtocolError(e).into())) as Box<dyn Future<Item = (), Error = Error> + Send + 'static>)).map_err(|e| ErrorKind::ProtocolError(e).into())))
117 }
118
119 fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
120 Box::new(self.connect_full(connector, properties).map(move |(client, heartbeat_handle, heartbeat_future)| {
121 tokio_executor::spawn(heartbeat_future.map_err(|e| heartbeat_error_handler(e)));
122 (client, heartbeat_handle)
123 }))
124 }
125}
126
127macro_rules! try_uri (
128 ($self: expr) => ({
129 match $self.parse::<AMQPUri>() {
130 Ok(uri) => uri,
131 Err(err) => return Box::new(futures::future::err(ErrorKind::UriParsingError(err).into())),
132 }
133 });
134);
135
136impl<'a, TlsStream: AsyncRead + AsyncWrite + Send + Sync + 'static> AMQPConnectionTlsExt<TlsStream> for &'a str {
137 fn connect<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
138 try_uri!(self).connect(connector)
139 }
140
141 fn connect_cancellable<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
142 try_uri!(self).connect_cancellable(heartbeat_error_handler, connector)
143 }
144
145 fn connect_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(self, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle, Box<dyn Future<Item = (), Error = Error> + Send + 'static>), Error = Error> + Send + 'static> {
146 try_uri!(self).connect_full(connector, properties)
147 }
148
149 fn connect_cancellable_full<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static, F: FnOnce(Error) + Send + 'static>(self, heartbeat_error_handler: F, connector: Connector, properties: ConnectionProperties) -> Box<dyn Future<Item = (lapin::client::Client<AMQPStream<TlsStream>>, lapin::client::HeartbeatHandle), Error = Error> + Send + 'static> {
150 try_uri!(self).connect_cancellable_full(heartbeat_error_handler, connector, properties)
151 }
152}
153
154impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AMQPStream<TlsStream> {
155 fn from_amqp_uri<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(uri: &AMQPUri, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
156 match uri.scheme {
157 AMQPScheme::AMQP => AMQPStream::raw(uri.authority.host.clone(), uri.authority.port),
158 AMQPScheme::AMQPS => AMQPStream::tls(uri.authority.host.clone(), uri.authority.port, connector),
159 }
160 }
161
162 fn raw(host: String, port: u16) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
163 Box::new(open_tcp_stream(host, port).map(AMQPStream::Raw))
164 }
165
166 fn tls<Connector: FnOnce(String, TcpStream) -> Box<dyn Future<Item = Box<TlsStream>, Error = io::Error> + Send + 'static> + Send + 'static>(host: String, port: u16, connector: Connector) -> Box<dyn Future<Item = Self, Error = Error> + Send + 'static> {
167 Box::new(
168 open_tcp_stream(host.clone(), port).and_then(move |stream| {
169 connector(host, stream).map(AMQPStream::Tls).map_err(|e| ErrorKind::ConnectionFailed(e).into())
170 })
171 )
172 }
173}
174
175impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Read for AMQPStream<TlsStream> {
176 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
177 match *self {
178 AMQPStream::Raw(ref mut raw) => raw.read(buf),
179 AMQPStream::Tls(ref mut tls) => tls.read(buf),
180 }
181 }
182}
183
184impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncRead for AMQPStream<TlsStream> {
185 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
186 match *self {
187 AMQPStream::Raw(ref raw) => raw.prepare_uninitialized_buffer(buf),
188 AMQPStream::Tls(ref tls) => tls.prepare_uninitialized_buffer(buf),
189 }
190 }
191
192 fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
193 match *self {
194 AMQPStream::Raw(ref mut raw) => raw.read_buf(buf),
195 AMQPStream::Tls(ref mut tls) => tls.read_buf(buf),
196 }
197 }
198}
199
200impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> Write for AMQPStream<TlsStream> {
201 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
202 match *self {
203 AMQPStream::Raw(ref mut raw) => raw.write(buf),
204 AMQPStream::Tls(ref mut tls) => tls.write(buf),
205 }
206 }
207
208 fn flush(&mut self) -> io::Result<()> {
209 match *self {
210 AMQPStream::Raw(ref mut raw) => raw.flush(),
211 AMQPStream::Tls(ref mut tls) => tls.flush(),
212 }
213 }
214}
215
216impl<TlsStream: AsyncRead + AsyncWrite + Send + 'static> AsyncWrite for AMQPStream<TlsStream> {
217 fn shutdown(&mut self) -> Poll<(), io::Error> {
218 match *self {
219 AMQPStream::Raw(ref mut raw) => raw.shutdown(),
220 AMQPStream::Tls(ref mut tls) => tls.shutdown(),
221 }
222 }
223
224 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
225 match *self {
226 AMQPStream::Raw(ref mut raw) => raw.write_buf(buf),
227 AMQPStream::Tls(ref mut tls) => tls.write_buf(buf),
228 }
229 }
230}
231
232fn open_tcp_stream(host: String, port: u16) -> Box<dyn Future<Item = TcpStream, Error = Error> + Send + 'static> {
233 let host2 = host.clone();
234 Box::new(
235 futures::future::result(AsyncResolver::from_system_conf()).and_then(move |(resolver, background)| {
236 tokio_executor::spawn(background);
237 resolver.lookup_ip(host.as_str())
238 }).map_err(|e| ErrorKind::InvalidDomainName(e.to_string()).into()).and_then(|response| {
239 response.iter().next().ok_or_else(|| ErrorKind::InvalidDomainName(host2).into())
240 }).and_then(move |ipaddr| {
241 TcpStream::connect(&SocketAddr::new(ipaddr, port)).map_err(|e| ErrorKind::ConnectionFailed(e).into())
242 })
243 )
244}