1use std::{
4 future::Future,
5 marker::PhantomData,
6 pin::Pin,
7 sync::Arc,
8 task::{ready, Context, Poll},
9};
10
11use http::{Request, Response};
12use http_body::Body;
13use tokio::io::{AsyncRead, AsyncWrite};
14
15use crate::{
16 body::Incoming,
17 dispatch::{self, TrySendError},
18 error::{BoxError, Error},
19 proto::{
20 self,
21 http2::{ping, Http2Options},
22 },
23 rt::{bounds::Http2ClientConnExec, Time, Timer},
24 Result,
25};
26
27pub struct SendRequest<B> {
29 dispatch: dispatch::UnboundedSender<Request<B>, Response<Incoming>>,
30}
31
32impl<B> Clone for SendRequest<B> {
33 #[inline]
34 fn clone(&self) -> SendRequest<B> {
35 SendRequest {
36 dispatch: self.dispatch.clone(),
37 }
38 }
39}
40
41#[must_use = "futures do nothing unless polled"]
46pub struct Connection<T, B, E>
47where
48 T: AsyncRead + AsyncWrite + Unpin,
49 B: Body + 'static,
50 E: Http2ClientConnExec<B, T> + Unpin,
51 B::Error: Into<BoxError>,
52{
53 inner: (PhantomData<T>, proto::http2::client::ClientTask<B, E, T>),
54}
55
56#[derive(Clone)]
63pub struct Builder<Ex> {
64 exec: Ex,
65 timer: Time,
66 opts: Http2Options,
67}
68
69impl<B> SendRequest<B> {
72 #[inline]
76 pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
77 if self.is_closed() {
78 Poll::Ready(Err(Error::new_closed()))
79 } else {
80 Poll::Ready(Ok(()))
81 }
82 }
83
84 #[inline]
88 pub async fn ready(&mut self) -> Result<()> {
89 std::future::poll_fn(|cx| self.poll_ready(cx)).await
90 }
91
92 #[inline]
100 pub fn is_ready(&self) -> bool {
101 self.dispatch.is_ready()
102 }
103
104 #[inline]
106 pub fn is_closed(&self) -> bool {
107 self.dispatch.is_closed()
108 }
109}
110
111impl<B> SendRequest<B>
112where
113 B: Body + 'static,
114{
115 pub fn try_send_request(
124 &mut self,
125 req: Request<B>,
126 ) -> impl Future<Output = Result<Response<Incoming>, TrySendError<Request<B>>>> {
127 let sent = self.dispatch.try_send(req);
128 async move {
129 match sent {
130 Ok(rx) => match rx.await {
131 Ok(Ok(res)) => Ok(res),
132 Ok(Err(err)) => Err(err),
133 Err(_) => panic!("dispatch dropped without returning error"),
135 },
136 Err(req) => {
137 debug!("connection was not ready");
138 let error = Error::new_canceled().with("connection was not ready");
139 Err(TrySendError {
140 error,
141 message: Some(req),
142 })
143 }
144 }
145 }
146 }
147}
148
149impl<T, B, E> Future for Connection<T, B, E>
152where
153 T: AsyncRead + AsyncWrite + Unpin + 'static,
154 B: Body + 'static + Unpin,
155 B::Data: Send,
156 E: Unpin,
157 B::Error: Into<BoxError>,
158 E: Http2ClientConnExec<B, T> + Unpin,
159{
160 type Output = Result<()>;
161
162 #[inline]
163 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164 match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
165 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
166 proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
167 }
168 }
169}
170
171impl<Ex> Builder<Ex>
174where
175 Ex: Clone,
176{
177 #[inline]
179 pub fn new(exec: Ex) -> Builder<Ex> {
180 Builder {
181 exec,
182 timer: Time::Empty,
183 opts: Default::default(),
184 }
185 }
186
187 #[inline]
189 pub fn timer<M>(mut self, timer: M) -> Self
190 where
191 M: Timer + Send + Sync + 'static,
192 {
193 self.timer = Time::Timer(Arc::new(timer));
194 self
195 }
196
197 #[inline]
199 pub fn options(mut self, opts: Http2Options) -> Self {
200 self.opts = opts;
201 self
202 }
203
204 pub async fn handshake<T, B>(self, io: T) -> Result<(SendRequest<B>, Connection<T, B, Ex>)>
209 where
210 T: AsyncRead + AsyncWrite + Unpin,
211 B: Body + 'static,
212 B::Data: Send,
213 B::Error: Into<BoxError>,
214 Ex: Http2ClientConnExec<B, T> + Unpin,
215 {
216 trace!("client handshake HTTP/2");
217
218 let mut builder = http2::client::Builder::default();
220 builder
221 .initial_max_send_streams(self.opts.initial_max_send_streams)
222 .initial_window_size(self.opts.initial_window_size)
223 .initial_connection_window_size(self.opts.initial_conn_window_size)
224 .max_send_buffer_size(self.opts.max_send_buffer_size);
225 if let Some(id) = self.opts.initial_stream_id {
226 builder.initial_stream_id(id);
227 }
228 if let Some(max) = self.opts.max_pending_accept_reset_streams {
229 builder.max_pending_accept_reset_streams(max);
230 }
231 if let Some(max) = self.opts.max_concurrent_reset_streams {
232 builder.max_concurrent_reset_streams(max);
233 }
234 if let Some(max) = self.opts.max_concurrent_streams {
235 builder.max_concurrent_streams(max);
236 }
237 if let Some(max) = self.opts.max_header_list_size {
238 builder.max_header_list_size(max);
239 }
240 if let Some(opt) = self.opts.enable_push {
241 builder.enable_push(opt);
242 }
243 if let Some(max) = self.opts.max_frame_size {
244 builder.max_frame_size(max);
245 }
246 if let Some(max) = self.opts.header_table_size {
247 builder.header_table_size(max);
248 }
249 if let Some(v) = self.opts.enable_connect_protocol {
250 builder.enable_connect_protocol(v);
251 }
252 if let Some(v) = self.opts.no_rfc7540_priorities {
253 builder.no_rfc7540_priorities(v);
254 }
255 if let Some(order) = self.opts.settings_order {
256 builder.settings_order(order);
257 }
258 if let Some(stream_dependency) = self.opts.headers_stream_dependency {
259 builder.headers_stream_dependency(stream_dependency);
260 }
261 if let Some(order) = self.opts.headers_pseudo_order {
262 builder.headers_pseudo_order(order);
263 }
264 if let Some(priority) = self.opts.priorities {
265 builder.priorities(priority);
266 }
267
268 let ping_config = ping::Config::new(
270 self.opts.adaptive_window,
271 self.opts.initial_window_size,
272 self.opts.keep_alive_interval,
273 self.opts.keep_alive_timeout,
274 self.opts.keep_alive_while_idle,
275 );
276
277 let (tx, rx) = dispatch::channel();
278 let h2 =
279 proto::http2::client::handshake(io, rx, builder, ping_config, self.exec, self.timer)
280 .await?;
281 Ok((
282 SendRequest {
283 dispatch: tx.unbound(),
284 },
285 Connection {
286 inner: (PhantomData, h2),
287 },
288 ))
289 }
290}