1use std::{
2 convert::Infallible,
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_util::future::{Either, FusedFuture, FutureExt as _};
15use futures_util::ready;
16use futures_util::stream::{StreamExt as _, StreamFuture};
17use h2::client::{Builder, Connection, SendRequest};
18use h2::SendStream;
19use http::{Method, StatusCode};
20use pin_project_lite::pin_project;
21
22use super::ping::{Ponger, Recorder};
23use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
24use crate::body::{Body, Incoming as IncomingBody};
25use crate::client::dispatch::{Callback, SendWhen, TrySendError};
26use crate::common::io::Compat;
27use crate::common::time::Time;
28use crate::ext::{FramePriority, FrameStreamDependency, Protocol, PseudoType};
29use crate::headers;
30use crate::proto::h2::UpgradedSendStream;
31use crate::proto::Dispatched;
32use crate::rt::bounds::Http2ClientConnExec;
33use crate::upgrade::Upgraded;
34use crate::{Request, Response};
35use h2::client::ResponseFuture;
36
37type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
38
39type ConnDropRef = mpsc::Sender<Infallible>;
42
43type ConnEof = oneshot::Receiver<Infallible>;
46
47const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const _DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
64
65#[derive(Clone, Debug)]
66pub(crate) struct Config {
67 pub(crate) adaptive_window: bool,
68 pub(crate) initial_conn_window_size: u32,
69 pub(crate) initial_stream_window_size: u32,
70 pub(crate) initial_max_send_streams: usize,
71 pub(crate) max_frame_size: Option<u32>,
72 pub(crate) max_header_list_size: u32,
73 pub(crate) keep_alive_interval: Option<Duration>,
74 pub(crate) keep_alive_timeout: Duration,
75 pub(crate) keep_alive_while_idle: bool,
76 pub(crate) max_concurrent_reset_streams: Option<usize>,
77 pub(crate) max_send_buffer_size: usize,
78 pub(crate) max_pending_accept_reset_streams: Option<usize>,
79 pub(crate) header_table_size: Option<u32>,
80 pub(crate) max_concurrent_streams: Option<u32>,
81
82 pub(crate) enable_push: bool,
84 pub(crate) headers_frame_pseudo_order: Option<&'static [PseudoType; 4]>,
85 pub(crate) headers_frame_priority: Option<FrameStreamDependency>,
86 pub(crate) virtual_streams_priorities: Option<&'static [FramePriority]>,
87}
88
89impl Default for Config {
90 fn default() -> Config {
91 Config {
92 adaptive_window: false,
93 initial_conn_window_size: DEFAULT_CONN_WINDOW,
94 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
95 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
96 max_frame_size: None,
97 max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
98 keep_alive_interval: None,
99 keep_alive_timeout: Duration::from_secs(20),
100 keep_alive_while_idle: false,
101 max_concurrent_reset_streams: None,
102 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
103 max_pending_accept_reset_streams: None,
104 header_table_size: None,
105 max_concurrent_streams: None,
106
107 enable_push: true,
109 headers_frame_pseudo_order: None,
110 headers_frame_priority: None,
111 virtual_streams_priorities: None,
112 }
113 }
114}
115
116fn new_builder(config: &Config) -> Builder {
117 let mut builder = Builder::default();
118 builder
119 .initial_max_send_streams(config.initial_max_send_streams)
120 .initial_window_size(config.initial_stream_window_size)
121 .initial_connection_window_size(config.initial_conn_window_size)
122 .max_header_list_size(config.max_header_list_size)
123 .max_send_buffer_size(config.max_send_buffer_size)
124 .enable_push(config.enable_push)
125 .headers_frame_pseudo_order(config.headers_frame_pseudo_order)
126 .headers_frame_priority(config.headers_frame_priority)
127 .virtual_streams_priorities(config.virtual_streams_priorities);
128 if let Some(max) = config.max_frame_size {
129 builder.max_frame_size(max);
130 }
131 if let Some(max) = config.max_concurrent_reset_streams {
132 builder.max_concurrent_reset_streams(max);
133 }
134 if let Some(max) = config.max_pending_accept_reset_streams {
135 builder.max_pending_accept_reset_streams(max);
136 }
137 if let Some(size) = config.header_table_size {
138 builder.header_table_size(size);
139 }
140 if let Some(max) = config.max_concurrent_streams {
141 builder.max_concurrent_streams(max);
142 }
143 builder
144}
145
146fn new_ping_config(config: &Config) -> ping::Config {
147 ping::Config {
148 bdp_initial_window: if config.adaptive_window {
149 Some(config.initial_stream_window_size)
150 } else {
151 None
152 },
153 keep_alive_interval: config.keep_alive_interval,
154 keep_alive_timeout: config.keep_alive_timeout,
155 keep_alive_while_idle: config.keep_alive_while_idle,
156 }
157}
158
159pub(crate) async fn handshake<T, B, E>(
160 io: T,
161 req_rx: ClientRx<B>,
162 config: &Config,
163 mut exec: E,
164 timer: Time,
165) -> crate::Result<ClientTask<B, E, T>>
166where
167 T: Read + Write + Unpin,
168 B: Body + 'static,
169 B::Data: Send + 'static,
170 E: Http2ClientConnExec<B, T> + Unpin,
171 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
172{
173 let (h2_tx, mut conn) = new_builder(config)
174 .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
175 .await
176 .map_err(crate::Error::new_h2)?;
177
178 let (conn_drop_ref, rx) = mpsc::channel(1);
183 let (cancel_tx, conn_eof) = oneshot::channel();
184
185 let conn_drop_rx = rx.into_future();
186
187 let ping_config = new_ping_config(config);
188
189 let (conn, ping) = if ping_config.is_enabled() {
190 let pp = conn.ping_pong().expect("conn.ping_pong");
191 let (recorder, ponger) = ping::channel(pp, ping_config, timer);
192
193 let conn: Conn<_, B> = Conn::new(ponger, conn);
194 (Either::Left(conn), recorder)
195 } else {
196 (Either::Right(conn), ping::disabled())
197 };
198 let conn: ConnMapErr<T, B> = ConnMapErr {
199 conn,
200 is_terminated: false,
201 };
202
203 exec.execute_h2_future(H2ClientFuture::Task {
204 task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
205 });
206
207 Ok(ClientTask {
208 ping,
209 conn_drop_ref,
210 conn_eof,
211 executor: exec,
212 h2_tx,
213 req_rx,
214 fut_ctx: None,
215 marker: PhantomData,
216 })
217}
218
219pin_project! {
220 struct Conn<T, B>
221 where
222 B: Body,
223 {
224 #[pin]
225 ponger: Ponger,
226 #[pin]
227 conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
228 }
229}
230
231impl<T, B> Conn<T, B>
232where
233 B: Body,
234 T: Read + Write + Unpin,
235{
236 fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
237 Conn { ponger, conn }
238 }
239}
240
241impl<T, B> Future for Conn<T, B>
242where
243 B: Body,
244 T: Read + Write + Unpin,
245{
246 type Output = Result<(), h2::Error>;
247
248 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
249 let mut this = self.project();
250 match this.ponger.poll(cx) {
251 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
252 this.conn.set_target_window_size(wnd);
253 this.conn.set_initial_window_size(wnd)?;
254 }
255 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
256 debug!("connection keep-alive timed out");
257 return Poll::Ready(Ok(()));
258 }
259 Poll::Pending => {}
260 }
261
262 Pin::new(&mut this.conn).poll(cx)
263 }
264}
265
266pin_project! {
267 struct ConnMapErr<T, B>
268 where
269 B: Body,
270 T: Read,
271 T: Write,
272 T: Unpin,
273 {
274 #[pin]
275 conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
276 #[pin]
277 is_terminated: bool,
278 }
279}
280
281impl<T, B> Future for ConnMapErr<T, B>
282where
283 B: Body,
284 T: Read + Write + Unpin,
285{
286 type Output = Result<(), ()>;
287
288 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
289 let mut this = self.project();
290
291 if *this.is_terminated {
292 return Poll::Pending;
293 }
294 let polled = this.conn.poll(cx);
295 if polled.is_ready() {
296 *this.is_terminated = true;
297 }
298 polled.map_err(|_e| {
299 debug!(error = %_e, "connection error");
300 })
301 }
302}
303
304impl<T, B> FusedFuture for ConnMapErr<T, B>
305where
306 B: Body,
307 T: Read + Write + Unpin,
308{
309 fn is_terminated(&self) -> bool {
310 self.is_terminated
311 }
312}
313
314pin_project! {
315 pub struct ConnTask<T, B>
316 where
317 B: Body,
318 T: Read,
319 T: Write,
320 T: Unpin,
321 {
322 #[pin]
323 drop_rx: StreamFuture<Receiver<Infallible>>,
324 #[pin]
325 cancel_tx: Option<oneshot::Sender<Infallible>>,
326 #[pin]
327 conn: ConnMapErr<T, B>,
328 }
329}
330
331impl<T, B> ConnTask<T, B>
332where
333 B: Body,
334 T: Read + Write + Unpin,
335{
336 fn new(
337 conn: ConnMapErr<T, B>,
338 drop_rx: StreamFuture<Receiver<Infallible>>,
339 cancel_tx: oneshot::Sender<Infallible>,
340 ) -> Self {
341 Self {
342 drop_rx,
343 cancel_tx: Some(cancel_tx),
344 conn,
345 }
346 }
347}
348
349impl<T, B> Future for ConnTask<T, B>
350where
351 B: Body,
352 T: Read + Write + Unpin,
353{
354 type Output = ();
355
356 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
357 let mut this = self.project();
358
359 if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
360 return Poll::Ready(());
362 }
363
364 if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
365 trace!("send_request dropped, starting conn shutdown");
369 drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
370 }
371
372 Poll::Pending
373 }
374}
375
376pin_project! {
377 #[project = H2ClientFutureProject]
378 pub enum H2ClientFuture<B, T>
379 where
380 B: http_body::Body,
381 B: 'static,
382 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
383 T: Read,
384 T: Write,
385 T: Unpin,
386 {
387 Pipe {
388 #[pin]
389 pipe: PipeMap<B>,
390 },
391 Send {
392 #[pin]
393 send_when: SendWhen<B>,
394 },
395 Task {
396 #[pin]
397 task: ConnTask<T, B>,
398 },
399 }
400}
401
402impl<B, T> Future for H2ClientFuture<B, T>
403where
404 B: http_body::Body + 'static,
405 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
406 T: Read + Write + Unpin,
407{
408 type Output = ();
409
410 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
411 let this = self.project();
412
413 match this {
414 H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
415 H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
416 H2ClientFutureProject::Task { task } => task.poll(cx),
417 }
418 }
419}
420
421struct FutCtx<B>
422where
423 B: Body,
424{
425 is_connect: bool,
426 eos: bool,
427 fut: ResponseFuture,
428 body_tx: SendStream<SendBuf<B::Data>>,
429 body: B,
430 cb: Callback<Request<B>, Response<IncomingBody>>,
431}
432
433impl<B: Body> Unpin for FutCtx<B> {}
434
435pub(crate) struct ClientTask<B, E, T>
436where
437 B: Body,
438 E: Unpin,
439{
440 ping: ping::Recorder,
441 conn_drop_ref: ConnDropRef,
442 conn_eof: ConnEof,
443 executor: E,
444 h2_tx: SendRequest<SendBuf<B::Data>>,
445 req_rx: ClientRx<B>,
446 fut_ctx: Option<FutCtx<B>>,
447 marker: PhantomData<T>,
448}
449
450impl<B, E, T> ClientTask<B, E, T>
451where
452 B: Body + 'static,
453 E: Http2ClientConnExec<B, T> + Unpin,
454 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
455 T: Read + Write + Unpin,
456{
457 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
458 self.h2_tx.is_extended_connect_protocol_enabled()
459 }
460}
461
462pin_project! {
463 pub struct PipeMap<S>
464 where
465 S: Body,
466 {
467 #[pin]
468 pipe: PipeToSendStream<S>,
469 #[pin]
470 conn_drop_ref: Option<Sender<Infallible>>,
471 #[pin]
472 ping: Option<Recorder>,
473 }
474}
475
476impl<B> Future for PipeMap<B>
477where
478 B: http_body::Body,
479 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
480{
481 type Output = ();
482
483 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
484 let mut this = self.project();
485
486 match this.pipe.poll_unpin(cx) {
487 Poll::Ready(result) => {
488 if let Err(_e) = result {
489 debug!("client request body error: {}", _e);
490 }
491 drop(this.conn_drop_ref.take().expect("Future polled twice"));
492 drop(this.ping.take().expect("Future polled twice"));
493 return Poll::Ready(());
494 }
495 Poll::Pending => (),
496 };
497 Poll::Pending
498 }
499}
500
501impl<B, E, T> ClientTask<B, E, T>
502where
503 B: Body + 'static + Unpin,
504 B::Data: Send,
505 E: Http2ClientConnExec<B, T> + Unpin,
506 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
507 T: Read + Write + Unpin,
508{
509 fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
510 let ping = self.ping.clone();
511
512 let send_stream = if !f.is_connect {
513 if !f.eos {
514 let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
515
516 match Pin::new(&mut pipe).poll(cx) {
519 Poll::Ready(_) => (),
520 Poll::Pending => {
521 let conn_drop_ref = self.conn_drop_ref.clone();
522 let ping = ping.clone();
526
527 let pipe = PipeMap {
528 pipe,
529 conn_drop_ref: Some(conn_drop_ref),
530 ping: Some(ping),
531 };
532 self.executor
534 .execute_h2_future(H2ClientFuture::Pipe { pipe });
535 }
536 }
537 }
538
539 None
540 } else {
541 Some(f.body_tx)
542 };
543
544 self.executor.execute_h2_future(H2ClientFuture::Send {
545 send_when: SendWhen {
546 when: ResponseFutMap {
547 fut: f.fut,
548 ping: Some(ping),
549 send_stream: Some(send_stream),
550 },
551 call_back: Some(f.cb),
552 },
553 });
554 }
555}
556
557pin_project! {
558 pub(crate) struct ResponseFutMap<B>
559 where
560 B: Body,
561 B: 'static,
562 {
563 #[pin]
564 fut: ResponseFuture,
565 #[pin]
566 ping: Option<Recorder>,
567 #[pin]
568 send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
569 }
570}
571
572impl<B> Future for ResponseFutMap<B>
573where
574 B: Body + 'static,
575{
576 type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
577
578 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
579 let mut this = self.project();
580
581 let result = ready!(this.fut.poll(cx));
582
583 let ping = this.ping.take().expect("Future polled twice");
584 let send_stream = this.send_stream.take().expect("Future polled twice");
585
586 match result {
587 Ok(res) => {
588 ping.record_non_data();
590
591 let content_length = headers::content_length_parse_all(res.headers());
592 if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
593 if content_length.map_or(false, |len| len != 0) {
594 warn!("h2 connect response with non-zero body not supported");
595
596 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
597 return Poll::Ready(Err((
598 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
599 None::<Request<B>>,
600 )));
601 }
602 let (parts, recv_stream) = res.into_parts();
603 let mut res = Response::from_parts(parts, IncomingBody::empty());
604
605 let (pending, on_upgrade) = crate::upgrade::pending();
606 let io = H2Upgraded {
607 ping,
608 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
609 recv_stream,
610 buf: Bytes::new(),
611 };
612 let upgraded = Upgraded::new(io, Bytes::new());
613
614 pending.fulfill(upgraded);
615 res.extensions_mut().insert(on_upgrade);
616
617 Poll::Ready(Ok(res))
618 } else {
619 let res = res.map(|stream| {
620 let ping = ping.for_stream(&stream);
621 IncomingBody::h2(stream, content_length.into(), ping)
622 });
623 Poll::Ready(Ok(res))
624 }
625 }
626 Err(err) => {
627 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
628
629 debug!("client response error: {}", err);
630 Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
631 }
632 }
633 }
634}
635
636impl<B, E, T> Future for ClientTask<B, E, T>
637where
638 B: Body + 'static + Unpin,
639 B::Data: Send,
640 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
641 E: Http2ClientConnExec<B, T> + Unpin,
642 T: Read + Write + Unpin,
643{
644 type Output = crate::Result<Dispatched>;
645
646 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
647 loop {
648 match ready!(self.h2_tx.poll_ready(cx)) {
649 Ok(()) => (),
650 Err(err) => {
651 self.ping.ensure_not_timed_out()?;
652 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
653 trace!("connection gracefully shutdown");
654 Poll::Ready(Ok(Dispatched::Shutdown))
655 } else {
656 Poll::Ready(Err(crate::Error::new_h2(err)))
657 };
658 }
659 };
660
661 if let Some(f) = self.fut_ctx.take() {
664 self.poll_pipe(f, cx);
665 continue;
666 }
667
668 match self.req_rx.poll_recv(cx) {
669 Poll::Ready(Some((req, cb))) => {
670 if cb.is_canceled() {
672 trace!("request callback is canceled");
673 continue;
674 }
675 let (head, body) = req.into_parts();
676 let mut req = ::http::Request::from_parts(head, ());
677 super::strip_connection_headers(req.headers_mut(), true);
678 if let Some(len) = body.size_hint().exact() {
679 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
680 headers::set_content_length_if_missing(req.headers_mut(), len);
681 }
682 }
683
684 let is_connect = req.method() == Method::CONNECT;
685 let eos = body.is_end_stream();
686
687 if is_connect
688 && headers::content_length_parse_all(req.headers())
689 .map_or(false, |len| len != 0)
690 {
691 warn!("h2 connect request with non-zero body not supported");
692 cb.send(Err(TrySendError {
693 error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
694 message: None,
695 }));
696 continue;
697 }
698
699 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
700 req.extensions_mut().insert(protocol.into_inner());
701 }
702
703 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
704 Ok(ok) => ok,
705 Err(err) => {
706 debug!("client send request error: {}", err);
707 cb.send(Err(TrySendError {
708 error: crate::Error::new_h2(err),
709 message: None,
710 }));
711 continue;
712 }
713 };
714
715 let f = FutCtx {
716 is_connect,
717 eos,
718 fut,
719 body_tx,
720 body,
721 cb,
722 };
723
724 match self.h2_tx.poll_ready(cx) {
728 Poll::Pending => {
729 self.fut_ctx = Some(f);
731 return Poll::Pending;
732 }
733 Poll::Ready(Ok(())) => (),
734 Poll::Ready(Err(err)) => {
735 f.cb.send(Err(TrySendError {
736 error: crate::Error::new_h2(err),
737 message: None,
738 }));
739 continue;
740 }
741 }
742 self.poll_pipe(f, cx);
743 continue;
744 }
745
746 Poll::Ready(None) => {
747 trace!("client::dispatch::Sender dropped");
748 return Poll::Ready(Ok(Dispatched::Shutdown));
749 }
750
751 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
752 #[allow(unused)]
755 Ok(never) => match never {},
756 Err(_conn_is_eof) => {
757 trace!("connection task is closed, closing dispatch task");
758 return Poll::Ready(Ok(Dispatched::Shutdown));
759 }
760 },
761 }
762 }
763 }
764}