1use std::{
2 convert::Infallible,
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_core::{ready, FusedFuture, FusedStream, Stream};
15use h2::client::{Builder, Connection, SendRequest};
16use h2::SendStream;
17use http::{Method, StatusCode};
18use pin_project_lite::pin_project;
19
20use super::ping::{Ponger, Recorder};
21use super::{ping, PipeToSendStream, SendBuf};
22use crate::body::{Body, Incoming as IncomingBody};
23use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24use crate::common::either::Either;
25use crate::common::io::Compat;
26use crate::common::time::Time;
27use crate::ext::Protocol;
28use crate::headers;
29use crate::proto::Dispatched;
30use crate::rt::bounds::{Http2ClientConnExec, Http2UpgradedExec};
31use crate::upgrade::Upgraded;
32use crate::{Request, Response};
33use h2::client::ResponseFuture;
34
35type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
36
37type ConnDropRef = mpsc::Sender<Infallible>;
40
41type ConnEof = oneshot::Receiver<Infallible>;
44
45const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
62
63#[derive(Clone, Debug)]
64pub(crate) struct Config {
65 pub(crate) adaptive_window: bool,
66 pub(crate) initial_conn_window_size: u32,
67 pub(crate) initial_stream_window_size: u32,
68 pub(crate) initial_max_send_streams: usize,
69 pub(crate) max_frame_size: Option<u32>,
70 pub(crate) max_header_list_size: u32,
71 pub(crate) keep_alive_interval: Option<Duration>,
72 pub(crate) keep_alive_timeout: Duration,
73 pub(crate) keep_alive_while_idle: bool,
74 pub(crate) max_concurrent_reset_streams: Option<usize>,
75 pub(crate) max_send_buffer_size: usize,
76 pub(crate) max_pending_accept_reset_streams: Option<usize>,
77 pub(crate) max_local_error_reset_streams: Option<usize>,
78 pub(crate) header_table_size: Option<u32>,
79 pub(crate) max_concurrent_streams: Option<u32>,
80}
81
82impl Default for Config {
83 fn default() -> Config {
84 Config {
85 adaptive_window: false,
86 initial_conn_window_size: DEFAULT_CONN_WINDOW,
87 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
88 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
89 max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
90 max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
91 keep_alive_interval: None,
92 keep_alive_timeout: Duration::from_secs(20),
93 keep_alive_while_idle: false,
94 max_concurrent_reset_streams: None,
95 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
96 max_pending_accept_reset_streams: None,
97 max_local_error_reset_streams: Some(1024),
98 header_table_size: None,
99 max_concurrent_streams: None,
100 }
101 }
102}
103
104fn new_builder(config: &Config) -> Builder {
105 let mut builder = Builder::default();
106 builder
107 .initial_max_send_streams(config.initial_max_send_streams)
108 .initial_window_size(config.initial_stream_window_size)
109 .initial_connection_window_size(config.initial_conn_window_size)
110 .max_header_list_size(config.max_header_list_size)
111 .max_send_buffer_size(config.max_send_buffer_size)
112 .max_local_error_reset_streams(config.max_local_error_reset_streams)
113 .enable_push(false);
114 if let Some(max) = config.max_frame_size {
115 builder.max_frame_size(max);
116 }
117 if let Some(max) = config.max_concurrent_reset_streams {
118 builder.max_concurrent_reset_streams(max);
119 }
120 if let Some(max) = config.max_pending_accept_reset_streams {
121 builder.max_pending_accept_reset_streams(max);
122 }
123 if let Some(size) = config.header_table_size {
124 builder.header_table_size(size);
125 }
126 if let Some(max) = config.max_concurrent_streams {
127 builder.max_concurrent_streams(max);
128 }
129 builder
130}
131
132fn new_ping_config(config: &Config) -> ping::Config {
133 ping::Config {
134 bdp_initial_window: if config.adaptive_window {
135 Some(config.initial_stream_window_size)
136 } else {
137 None
138 },
139 keep_alive_interval: config.keep_alive_interval,
140 keep_alive_timeout: config.keep_alive_timeout,
141 keep_alive_while_idle: config.keep_alive_while_idle,
142 }
143}
144
145pub(crate) async fn handshake<T, B, E>(
146 io: T,
147 req_rx: ClientRx<B>,
148 config: &Config,
149 mut exec: E,
150 timer: Time,
151) -> crate::Result<ClientTask<B, E, T>>
152where
153 T: Read + Write + Unpin,
154 B: Body + 'static,
155 B::Data: Send + 'static,
156 E: Http2ClientConnExec<B, T> + Clone + Unpin,
157 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
158{
159 let (h2_tx, mut conn) = new_builder(config)
160 .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
161 .await
162 .map_err(crate::Error::new_h2)?;
163
164 let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
169 let (cancel_tx, conn_eof) = oneshot::channel();
170
171 let ping_config = new_ping_config(config);
172
173 let (conn, ping) = if ping_config.is_enabled() {
174 let pp = conn.ping_pong().expect("conn.ping_pong");
175 let (recorder, ponger) = ping::channel(pp, ping_config, timer);
176
177 let conn: Conn<_, B> = Conn::new(ponger, conn);
178 (Either::left(conn), recorder)
179 } else {
180 (Either::right(conn), ping::disabled())
181 };
182 let conn: ConnMapErr<T, B> = ConnMapErr {
183 conn,
184 is_terminated: false,
185 };
186
187 exec.execute_h2_future(H2ClientFuture::Task {
188 task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
189 });
190
191 Ok(ClientTask {
192 ping,
193 conn_drop_ref,
194 conn_eof,
195 executor: exec,
196 h2_tx,
197 req_rx,
198 fut_ctx: None,
199 marker: PhantomData,
200 })
201}
202
203pin_project! {
204 struct Conn<T, B>
205 where
206 B: Body,
207 {
208 #[pin]
209 ponger: Ponger,
210 #[pin]
211 conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
212 }
213}
214
215impl<T, B> Conn<T, B>
216where
217 B: Body,
218 T: Read + Write + Unpin,
219{
220 fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
221 Conn { ponger, conn }
222 }
223}
224
225impl<T, B> Future for Conn<T, B>
226where
227 B: Body,
228 T: Read + Write + Unpin,
229{
230 type Output = Result<(), h2::Error>;
231
232 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
233 let mut this = self.project();
234 match this.ponger.poll(cx) {
235 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
236 this.conn.set_target_window_size(wnd);
237 this.conn.set_initial_window_size(wnd)?;
238 }
239 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
240 debug!("connection keep-alive timed out");
241 return Poll::Ready(Ok(()));
242 }
243 Poll::Pending => {}
244 }
245
246 Pin::new(&mut this.conn).poll(cx)
247 }
248}
249
250pin_project! {
251 struct ConnMapErr<T, B>
252 where
253 B: Body,
254 T: Read,
255 T: Write,
256 T: Unpin,
257 {
258 #[pin]
259 conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
260 #[pin]
261 is_terminated: bool,
262 }
263}
264
265impl<T, B> Future for ConnMapErr<T, B>
266where
267 B: Body,
268 T: Read + Write + Unpin,
269{
270 type Output = Result<(), ()>;
271
272 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273 let mut this = self.project();
274
275 if *this.is_terminated {
276 return Poll::Pending;
277 }
278 let polled = this.conn.poll(cx);
279 if polled.is_ready() {
280 *this.is_terminated = true;
281 }
282 polled.map_err(|_e| {
283 debug!(error = %_e, "connection error");
284 })
285 }
286}
287
288impl<T, B> FusedFuture for ConnMapErr<T, B>
289where
290 B: Body,
291 T: Read + Write + Unpin,
292{
293 fn is_terminated(&self) -> bool {
294 self.is_terminated
295 }
296}
297
298pin_project! {
299 pub struct ConnTask<T, B>
300 where
301 B: Body,
302 T: Read,
303 T: Write,
304 T: Unpin,
305 {
306 #[pin]
307 drop_rx: Receiver<Infallible>,
308 #[pin]
309 cancel_tx: Option<oneshot::Sender<Infallible>>,
310 #[pin]
311 conn: ConnMapErr<T, B>,
312 }
313}
314
315impl<T, B> ConnTask<T, B>
316where
317 B: Body,
318 T: Read + Write + Unpin,
319{
320 fn new(
321 conn: ConnMapErr<T, B>,
322 drop_rx: Receiver<Infallible>,
323 cancel_tx: oneshot::Sender<Infallible>,
324 ) -> Self {
325 Self {
326 drop_rx,
327 cancel_tx: Some(cancel_tx),
328 conn,
329 }
330 }
331}
332
333impl<T, B> Future for ConnTask<T, B>
334where
335 B: Body,
336 T: Read + Write + Unpin,
337{
338 type Output = ();
339
340 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341 let mut this = self.project();
342
343 if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
344 return Poll::Ready(());
346 }
347
348 if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
349 trace!("send_request dropped, starting conn shutdown");
353 drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
354 }
355
356 Poll::Pending
357 }
358}
359
360pin_project! {
361 #[project = H2ClientFutureProject]
362 pub enum H2ClientFuture<B, T, E>
363 where
364 B: http_body::Body,
365 B: 'static,
366 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
367 T: Read,
368 T: Write,
369 T: Unpin,
370 {
371 Pipe {
372 #[pin]
373 pipe: PipeMap<B>,
374 },
375 Send {
376 #[pin]
377 send_when: SendWhen<B, E>,
378 },
379 Task {
380 #[pin]
381 task: ConnTask<T, B>,
382 },
383 }
384}
385
386impl<B, T, E> Future for H2ClientFuture<B, T, E>
387where
388 B: http_body::Body + 'static,
389 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
390 T: Read + Write + Unpin,
391 E: Http2UpgradedExec<B::Data>,
392{
393 type Output = ();
394
395 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
396 let this = self.project();
397
398 match this {
399 H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
400 H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
401 H2ClientFutureProject::Task { task } => task.poll(cx),
402 }
403 }
404}
405
406struct FutCtx<B>
407where
408 B: Body,
409{
410 is_connect: bool,
411 eos: bool,
412 fut: ResponseFuture,
413 body_tx: SendStream<SendBuf<B::Data>>,
414 body: B,
415 cb: Callback<Request<B>, Response<IncomingBody>>,
416}
417
418impl<B: Body> Unpin for FutCtx<B> {}
419
420pub(crate) struct ClientTask<B, E, T>
421where
422 B: Body,
423 E: Unpin,
424{
425 ping: ping::Recorder,
426 conn_drop_ref: ConnDropRef,
427 conn_eof: ConnEof,
428 executor: E,
429 h2_tx: SendRequest<SendBuf<B::Data>>,
430 req_rx: ClientRx<B>,
431 fut_ctx: Option<FutCtx<B>>,
432 marker: PhantomData<T>,
433}
434
435impl<B, E, T> ClientTask<B, E, T>
436where
437 B: Body + 'static,
438 E: Http2ClientConnExec<B, T> + Unpin,
439 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
440 T: Read + Write + Unpin,
441{
442 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
443 self.h2_tx.is_extended_connect_protocol_enabled()
444 }
445 pub(crate) fn current_max_send_streams(&self) -> usize {
446 self.h2_tx.current_max_send_streams()
447 }
448 pub(crate) fn current_max_recv_streams(&self) -> usize {
449 self.h2_tx.current_max_recv_streams()
450 }
451}
452
453pin_project! {
454 pub struct PipeMap<S>
455 where
456 S: Body,
457 {
458 #[pin]
459 pipe: PipeToSendStream<S>,
460 #[pin]
461 conn_drop_ref: Option<Sender<Infallible>>,
462 #[pin]
463 ping: Option<Recorder>,
464 cancel_rx: Option<oneshot::Receiver<()>>,
465 }
466}
467
468impl<B> Future for PipeMap<B>
469where
470 B: http_body::Body,
471 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
472{
473 type Output = ();
474
475 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
476 let mut this = self.project();
477
478 let cancel_result = this.cancel_rx.as_mut().map(|rx| Pin::new(rx).poll(cx));
482 match cancel_result {
483 Some(Poll::Ready(Ok(()))) => {
484 debug!("client request body send cancelled, resetting stream");
485 this.pipe.as_mut().send_reset(h2::Reason::CANCEL);
486 drop(this.conn_drop_ref.take().expect("Future polled twice"));
487 drop(this.ping.take().expect("Future polled twice"));
488 return Poll::Ready(());
489 }
490 Some(Poll::Ready(Err(_))) => {
491 *this.cancel_rx = None;
494 }
495 Some(Poll::Pending) | None => {}
496 }
497
498 match Pin::new(&mut this.pipe).poll(cx) {
499 Poll::Ready(result) => {
500 if let Err(_e) = result {
501 debug!("client request body error: {}", _e);
502 }
503 drop(this.conn_drop_ref.take().expect("Future polled twice"));
504 drop(this.ping.take().expect("Future polled twice"));
505 return Poll::Ready(());
506 }
507 Poll::Pending => (),
508 };
509 Poll::Pending
510 }
511}
512
513impl<B, E, T> ClientTask<B, E, T>
514where
515 B: Body + 'static + Unpin,
516 B::Data: Send,
517 E: Http2ClientConnExec<B, T> + Clone + Unpin,
518 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
519 T: Read + Write + Unpin,
520{
521 fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
522 let ping = self.ping.clone();
523
524 let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
527
528 let send_stream = if !f.is_connect {
529 if !f.eos {
530 let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
531
532 match Pin::new(&mut pipe).poll(cx) {
535 Poll::Ready(_) => (),
536 Poll::Pending => {
537 let conn_drop_ref = self.conn_drop_ref.clone();
538 let ping = ping.clone();
542
543 let pipe = PipeMap {
544 pipe,
545 conn_drop_ref: Some(conn_drop_ref),
546 ping: Some(ping),
547 cancel_rx: Some(cancel_rx),
548 };
549 self.executor
551 .execute_h2_future(H2ClientFuture::Pipe { pipe });
552 }
553 }
554 }
555
556 None
557 } else {
558 Some(f.body_tx)
559 };
560
561 self.executor.execute_h2_future(H2ClientFuture::Send {
562 send_when: SendWhen {
563 when: ResponseFutMap {
564 fut: f.fut,
565 ping: Some(ping),
566 send_stream: Some(send_stream),
567 exec: self.executor.clone(),
568 cancel_tx: Some(cancel_tx),
569 },
570 call_back: Some(f.cb),
571 },
572 });
573 }
574}
575
576pin_project! {
577 pub(crate) struct ResponseFutMap<B, E>
578 where
579 B: Body,
580 B: 'static,
581 {
582 #[pin]
583 fut: ResponseFuture,
584 ping: Option<Recorder>,
585 #[pin]
586 send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
587 exec: E,
588 cancel_tx: Option<oneshot::Sender<()>>,
589 }
590}
591
592impl<B: Body + 'static, E> ResponseFutMap<B, E> {
593 pub(crate) fn cancel(self: Pin<&mut Self>) {
595 if let Some(cancel_tx) = self.project().cancel_tx.take() {
596 let _ = cancel_tx.send(());
597 }
598 }
599}
600
601impl<B, E> Future for ResponseFutMap<B, E>
602where
603 B: Body + 'static,
604 E: Http2UpgradedExec<B::Data>,
605{
606 type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
607
608 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
609 let mut this = self.as_mut().project();
610
611 let result = ready!(this.fut.poll(cx));
612
613 let ping = this.ping.take().expect("Future polled twice");
614 let send_stream = this.send_stream.take().expect("Future polled twice");
615
616 match result {
617 Ok(res) => {
618 ping.record_non_data();
620
621 let content_length = headers::content_length_parse_all(res.headers());
622 if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
623 if content_length.map_or(false, |len| len != 0) {
624 warn!("h2 connect response with non-zero body not supported");
625
626 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
627 return Poll::Ready(Err((
628 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
629 None::<Request<B>>,
630 )));
631 }
632 let (parts, recv_stream) = res.into_parts();
633 let mut res = Response::from_parts(parts, IncomingBody::empty());
634
635 let (pending, on_upgrade) = crate::upgrade::pending();
636
637 let (h2_up, up_task) = super::upgrade::pair(send_stream, recv_stream, ping);
638 self.exec.execute_upgrade(up_task);
639 let upgraded = Upgraded::new(h2_up, Bytes::new());
640
641 pending.fulfill(upgraded);
642 res.extensions_mut().insert(on_upgrade);
643
644 Poll::Ready(Ok(res))
645 } else {
646 let res = res.map(|stream| {
647 let ping = ping.for_stream(&stream);
648 IncomingBody::h2(stream, content_length.into(), ping)
649 });
650 Poll::Ready(Ok(res))
651 }
652 }
653 Err(err) => {
654 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
655
656 debug!("client response error: {}", err);
657 Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
658 }
659 }
660 }
661}
662
663impl<B, E, T> Future for ClientTask<B, E, T>
664where
665 B: Body + 'static + Unpin,
666 B::Data: Send,
667 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
668 E: Http2ClientConnExec<B, T> + Clone + Unpin,
669 T: Read + Write + Unpin,
670{
671 type Output = crate::Result<Dispatched>;
672
673 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
674 loop {
675 match ready!(self.h2_tx.poll_ready(cx)) {
676 Ok(()) => (),
677 Err(err) => {
678 self.ping.ensure_not_timed_out()?;
679 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
680 trace!("connection gracefully shutdown");
681 Poll::Ready(Ok(Dispatched::Shutdown))
682 } else {
683 Poll::Ready(Err(crate::Error::new_h2(err)))
684 };
685 }
686 };
687
688 if let Some(f) = self.fut_ctx.take() {
691 self.poll_pipe(f, cx);
692 continue;
693 }
694
695 match self.req_rx.poll_recv(cx) {
696 Poll::Ready(Some((req, cb))) => {
697 if cb.is_canceled() {
699 trace!("request callback is canceled");
700 continue;
701 }
702 let (head, body) = req.into_parts();
703 let mut req = ::http::Request::from_parts(head, ());
704 super::strip_connection_headers(req.headers_mut(), true);
705 if let Some(len) = body.size_hint().exact() {
706 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
707 headers::set_content_length_if_missing(req.headers_mut(), len);
708 }
709 }
710
711 let is_connect = req.method() == Method::CONNECT;
712 let eos = body.is_end_stream();
713
714 if is_connect
715 && headers::content_length_parse_all(req.headers())
716 .map_or(false, |len| len != 0)
717 {
718 debug!("h2 connect request with non-zero body not supported");
719 cb.send(Err(TrySendError {
720 error: crate::Error::new_user_invalid_connect(),
721 message: None,
722 }));
723 continue;
724 }
725
726 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
727 req.extensions_mut().insert(protocol.into_inner());
728 }
729
730 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
731 Ok(ok) => ok,
732 Err(err) => {
733 debug!("client send request error: {}", err);
734 cb.send(Err(TrySendError {
735 error: crate::Error::new_h2(err),
736 message: None,
737 }));
738 continue;
739 }
740 };
741
742 let f = FutCtx {
743 is_connect,
744 eos,
745 fut,
746 body_tx,
747 body,
748 cb,
749 };
750
751 match self.h2_tx.poll_ready(cx) {
755 Poll::Pending => {
756 self.fut_ctx = Some(f);
758 return Poll::Pending;
759 }
760 Poll::Ready(Ok(())) => (),
761 Poll::Ready(Err(err)) => {
762 f.cb.send(Err(TrySendError {
763 error: crate::Error::new_h2(err),
764 message: None,
765 }));
766 continue;
767 }
768 }
769 self.poll_pipe(f, cx);
770 continue;
771 }
772
773 Poll::Ready(None) => {
774 trace!("client::dispatch::Sender dropped");
775 return Poll::Ready(Ok(Dispatched::Shutdown));
776 }
777
778 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
779 #[allow(unused)]
782 Ok(never) => match never {},
783 Err(_conn_is_eof) => {
784 trace!("connection task is closed, closing dispatch task");
785 return Poll::Ready(Ok(Dispatched::Shutdown));
786 }
787 },
788 }
789 }
790 }
791}