1use super::{background::Background, Connection};
2use crate::body::LiftBody;
3use futures::{try_ready, Async, Future, Poll};
4use http::Version;
5use http_body::Body as HttpBody;
6use http_connection::HttpConnection;
7use hyper::client::conn::{Builder, Handshake};
8use hyper::Error;
9use std::fmt;
10use std::marker::PhantomData;
11use tokio_executor::{DefaultExecutor, TypedExecutor};
12use tokio_io::{AsyncRead, AsyncWrite};
13use tower_http_util::connection::HttpMakeConnection;
14use tower_service::Service;
15
16#[derive(Debug)]
22pub struct Connect<A, B, C, E> {
23 inner: C,
24 builder: Builder,
25 exec: E,
26 _pd: PhantomData<(A, B)>,
27}
28
29pub trait ConnectExecutor<T, B>: TypedExecutor<Background<T, B>>
31where
32 T: AsyncRead + AsyncWrite + Send + 'static,
33 B: HttpBody + Send + 'static,
34 B::Data: Send,
35 B::Error: Into<crate::Error>,
36{
37}
38
39pub struct ConnectFuture<A, B, C, E>
41where
42 B: HttpBody,
43 C: HttpMakeConnection<A>,
44{
45 state: State<A, B, C>,
46 builder: Builder,
47 exec: E,
48}
49
50enum State<A, B, C>
51where
52 B: HttpBody,
53 C: HttpMakeConnection<A>,
54{
55 Connect(C::Future),
56 Handshake(Handshake<C::Connection, LiftBody<B>>),
57}
58
59#[derive(Debug)]
61pub enum ConnectError<T> {
62 Connect(T),
64 Handshake(Error),
66 SpawnError,
69}
70
71impl<E, T, B> ConnectExecutor<T, B> for E
74where
75 T: AsyncRead + AsyncWrite + Send + 'static,
76 B: HttpBody + Send + 'static,
77 B::Data: Send,
78 B::Error: Into<crate::Error>,
79 E: TypedExecutor<Background<T, B>>,
80{
81}
82
83impl<A, B, C> Connect<A, B, C, DefaultExecutor>
86where
87 C: HttpMakeConnection<A>,
88 B: HttpBody,
89 C::Connection: Send + 'static,
90{
91 pub fn new(inner: C) -> Self {
98 Connect::with_builder(inner, Builder::new())
99 }
100
101 pub fn with_builder(inner: C, builder: Builder) -> Self {
103 Connect::with_executor(inner, builder, DefaultExecutor::current())
104 }
105}
106
107impl<A, B, C, E> Connect<A, B, C, E>
108where
109 C: HttpMakeConnection<A>,
110 B: HttpBody,
111 C::Connection: Send + 'static,
112{
113 pub fn with_executor(inner: C, builder: Builder, exec: E) -> Self {
123 Connect {
124 inner,
125 builder,
126 exec,
127 _pd: PhantomData,
128 }
129 }
130}
131
132impl<A, B, C, E> Service<A> for Connect<A, B, C, E>
133where
134 C: HttpMakeConnection<A> + 'static,
135 B: HttpBody + Send + 'static,
136 B::Data: Send,
137 B::Error: Into<crate::Error>,
138 C::Connection: Send + 'static,
139 E: ConnectExecutor<C::Connection, B> + Clone,
140{
141 type Response = Connection<B>;
142 type Error = ConnectError<C::Error>;
143 type Future = ConnectFuture<A, B, C, E>;
144
145 fn poll_ready(&mut self) -> Poll<(), Self::Error> {
147 self.inner.poll_ready().map_err(ConnectError::Connect)
148 }
149
150 fn call(&mut self, target: A) -> Self::Future {
152 let state = State::Connect(self.inner.make_connection(target));
153 let builder = self.builder.clone();
154 let exec = self.exec.clone();
155
156 ConnectFuture {
157 state,
158 builder,
159 exec,
160 }
161 }
162}
163
164impl<A, B, C, E> Future for ConnectFuture<A, B, C, E>
167where
168 C: HttpMakeConnection<A>,
169 B: HttpBody + Send + 'static,
170 B::Data: Send,
171 B::Error: Into<crate::Error>,
172 C::Connection: Send + 'static,
173 E: ConnectExecutor<C::Connection, B>,
174{
175 type Item = Connection<B>;
176 type Error = ConnectError<C::Error>;
177
178 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
179 loop {
180 let io = match self.state {
181 State::Connect(ref mut fut) => {
182 let res = fut.poll().map_err(ConnectError::Connect);
183
184 try_ready!(res)
185 }
186 State::Handshake(ref mut fut) => {
187 let (sender, conn) = try_ready!(fut.poll().map_err(ConnectError::Handshake));
188
189 let (bg, handle) = Background::new(conn);
190 self.exec.spawn(bg).map_err(|_| ConnectError::SpawnError)?;
191
192 let connection = Connection::new(sender, handle);
193
194 return Ok(Async::Ready(connection));
195 }
196 };
197
198 let mut builder = self.builder.clone();
199
200 if let Some(Version::HTTP_2) = io.negotiated_version() {
201 builder.http2_only(true);
202 }
203
204 let handshake = builder.handshake(io);
205
206 self.state = State::Handshake(handshake);
207 }
208 }
209}
210
211impl<A, B, C, E> fmt::Debug for ConnectFuture<A, B, C, E>
212where
213 C: HttpMakeConnection<A>,
214 B: HttpBody,
215{
216 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
217 f.write_str("ConnectFuture")
218 }
219}
220
221impl<T> fmt::Display for ConnectError<T>
223where
224 T: fmt::Display,
225{
226 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
227 match *self {
228 ConnectError::Connect(ref why) => write!(
229 f,
230 "Error attempting to establish underlying session layer: {}",
231 why
232 ),
233 ConnectError::Handshake(ref why) => {
234 write!(f, "Error while performing HTTP handshake: {}", why,)
235 }
236 ConnectError::SpawnError => write!(f, "Error spawning background task"),
237 }
238 }
239}
240
241impl<T> std::error::Error for ConnectError<T>
242where
243 T: std::error::Error,
244{
245 fn description(&self) -> &str {
246 match *self {
247 ConnectError::Connect(_) => "error attempting to establish underlying session layer",
248 ConnectError::Handshake(_) => "error performing HTTP handshake",
249 ConnectError::SpawnError => "Error spawning background task",
250 }
251 }
252
253 fn cause(&self) -> Option<&std::error::Error> {
254 match *self {
255 ConnectError::Connect(ref why) => Some(why),
256 ConnectError::Handshake(ref why) => Some(why),
257 ConnectError::SpawnError => None,
258 }
259 }
260}