tower_hyper/client/
connect.rs

1use super::{background::Background, Connection};
2use crate::body::LiftBody;
3use futures::{try_ready, Async, Future, Poll};
4use http::Version;
5use http_body::Body as HttpBody;
6use http_connection::HttpConnection;
7use hyper::client::conn::{Builder, Handshake};
8use hyper::Error;
9use std::fmt;
10use std::marker::PhantomData;
11use tokio_executor::{DefaultExecutor, TypedExecutor};
12use tokio_io::{AsyncRead, AsyncWrite};
13use tower_http_util::connection::HttpMakeConnection;
14use tower_service::Service;
15
16/// Creates a `hyper` connection
17///
18/// This accepts a `hyper::client::conn::Builder` and provides
19/// a `MakeService` implementation to create connections from some
20/// target `A`
21#[derive(Debug)]
22pub struct Connect<A, B, C, E> {
23    inner: C,
24    builder: Builder,
25    exec: E,
26    _pd: PhantomData<(A, B)>,
27}
28
29/// Executor that will spawn the background connection task.
30pub trait ConnectExecutor<T, B>: TypedExecutor<Background<T, B>>
31where
32    T: AsyncRead + AsyncWrite + Send + 'static,
33    B: HttpBody + Send + 'static,
34    B::Data: Send,
35    B::Error: Into<crate::Error>,
36{
37}
38
39/// A future that resolves to the eventual connection or an error.
40pub struct ConnectFuture<A, B, C, E>
41where
42    B: HttpBody,
43    C: HttpMakeConnection<A>,
44{
45    state: State<A, B, C>,
46    builder: Builder,
47    exec: E,
48}
49
50enum State<A, B, C>
51where
52    B: HttpBody,
53    C: HttpMakeConnection<A>,
54{
55    Connect(C::Future),
56    Handshake(Handshake<C::Connection, LiftBody<B>>),
57}
58
59/// The error produced from creating a connection
60#[derive(Debug)]
61pub enum ConnectError<T> {
62    /// An error occurred while attempting to establish the connection.
63    Connect(T),
64    /// An error occurred while performing hyper's handshake.
65    Handshake(Error),
66    /// An error occurred attempting to spawn the connect task on the
67    /// provided executor.
68    SpawnError,
69}
70
71// ==== impl ConnectExecutor ====
72
73impl<E, T, B> ConnectExecutor<T, B> for E
74where
75    T: AsyncRead + AsyncWrite + Send + 'static,
76    B: HttpBody + Send + 'static,
77    B::Data: Send,
78    B::Error: Into<crate::Error>,
79    E: TypedExecutor<Background<T, B>>,
80{
81}
82
83// ===== impl Connect =====
84
85impl<A, B, C> Connect<A, B, C, DefaultExecutor>
86where
87    C: HttpMakeConnection<A>,
88    B: HttpBody,
89    C::Connection: Send + 'static,
90{
91    /// Create a new `Connect`.
92    ///
93    /// The `C` argument is used to obtain new session layer instances
94    /// (`AsyncRead` + `AsyncWrite`). For each new client service returned, a
95    /// Service is returned that can be driven by `poll_service` and to send
96    /// requests via `call`.
97    pub fn new(inner: C) -> Self {
98        Connect::with_builder(inner, Builder::new())
99    }
100
101    /// Create a new `Connect` with a builder.
102    pub fn with_builder(inner: C, builder: Builder) -> Self {
103        Connect::with_executor(inner, builder, DefaultExecutor::current())
104    }
105}
106
107impl<A, B, C, E> Connect<A, B, C, E>
108where
109    C: HttpMakeConnection<A>,
110    B: HttpBody,
111    C::Connection: Send + 'static,
112{
113    /// Create a new `Connect`.
114    ///
115    /// The `C` argument is used to obtain new session layer instances
116    /// (`AsyncRead` + `AsyncWrite`). For each new client service returned, a
117    /// Service is returned that can be driven by `poll_service` and to send
118    /// requests via `call`.
119    ///
120    /// The `E` argument is the executor that the background task for the connection
121    /// will be spawned on.
122    pub fn with_executor(inner: C, builder: Builder, exec: E) -> Self {
123        Connect {
124            inner,
125            builder,
126            exec,
127            _pd: PhantomData,
128        }
129    }
130}
131
132impl<A, B, C, E> Service<A> for Connect<A, B, C, E>
133where
134    C: HttpMakeConnection<A> + 'static,
135    B: HttpBody + Send + 'static,
136    B::Data: Send,
137    B::Error: Into<crate::Error>,
138    C::Connection: Send + 'static,
139    E: ConnectExecutor<C::Connection, B> + Clone,
140{
141    type Response = Connection<B>;
142    type Error = ConnectError<C::Error>;
143    type Future = ConnectFuture<A, B, C, E>;
144
145    /// Check if the `MakeConnection` is ready for a new connection.
146    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
147        self.inner.poll_ready().map_err(ConnectError::Connect)
148    }
149
150    /// Obtains a Connection on a single plaintext h2 connection to a remote.
151    fn call(&mut self, target: A) -> Self::Future {
152        let state = State::Connect(self.inner.make_connection(target));
153        let builder = self.builder.clone();
154        let exec = self.exec.clone();
155
156        ConnectFuture {
157            state,
158            builder,
159            exec,
160        }
161    }
162}
163
164// ===== impl ConnectFuture =====
165
166impl<A, B, C, E> Future for ConnectFuture<A, B, C, E>
167where
168    C: HttpMakeConnection<A>,
169    B: HttpBody + Send + 'static,
170    B::Data: Send,
171    B::Error: Into<crate::Error>,
172    C::Connection: Send + 'static,
173    E: ConnectExecutor<C::Connection, B>,
174{
175    type Item = Connection<B>;
176    type Error = ConnectError<C::Error>;
177
178    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
179        loop {
180            let io = match self.state {
181                State::Connect(ref mut fut) => {
182                    let res = fut.poll().map_err(ConnectError::Connect);
183
184                    try_ready!(res)
185                }
186                State::Handshake(ref mut fut) => {
187                    let (sender, conn) = try_ready!(fut.poll().map_err(ConnectError::Handshake));
188
189                    let (bg, handle) = Background::new(conn);
190                    self.exec.spawn(bg).map_err(|_| ConnectError::SpawnError)?;
191
192                    let connection = Connection::new(sender, handle);
193
194                    return Ok(Async::Ready(connection));
195                }
196            };
197
198            let mut builder = self.builder.clone();
199
200            if let Some(Version::HTTP_2) = io.negotiated_version() {
201                builder.http2_only(true);
202            }
203
204            let handshake = builder.handshake(io);
205
206            self.state = State::Handshake(handshake);
207        }
208    }
209}
210
211impl<A, B, C, E> fmt::Debug for ConnectFuture<A, B, C, E>
212where
213    C: HttpMakeConnection<A>,
214    B: HttpBody,
215{
216    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
217        f.write_str("ConnectFuture")
218    }
219}
220
221// ==== impl ConnectError ====
222impl<T> fmt::Display for ConnectError<T>
223where
224    T: fmt::Display,
225{
226    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
227        match *self {
228            ConnectError::Connect(ref why) => write!(
229                f,
230                "Error attempting to establish underlying session layer: {}",
231                why
232            ),
233            ConnectError::Handshake(ref why) => {
234                write!(f, "Error while performing HTTP handshake: {}", why,)
235            }
236            ConnectError::SpawnError => write!(f, "Error spawning background task"),
237        }
238    }
239}
240
241impl<T> std::error::Error for ConnectError<T>
242where
243    T: std::error::Error,
244{
245    fn description(&self) -> &str {
246        match *self {
247            ConnectError::Connect(_) => "error attempting to establish underlying session layer",
248            ConnectError::Handshake(_) => "error performing HTTP handshake",
249            ConnectError::SpawnError => "Error spawning background task",
250        }
251    }
252
253    fn cause(&self) -> Option<&std::error::Error> {
254        match *self {
255            ConnectError::Connect(ref why) => Some(why),
256            ConnectError::Handshake(ref why) => Some(why),
257            ConnectError::SpawnError => None,
258        }
259    }
260}