1use std::{
2 borrow::Cow, convert::Infallible, future::Future, marker::PhantomData, pin::Pin, task::{Context, Poll}, time::Duration
3};
4
5use crate::rt::{Read, Write};
6use bytes::Bytes;
7use futures_channel::mpsc::{Receiver, Sender};
8use futures_channel::{mpsc, oneshot};
9use futures_util::future::{Either, FusedFuture, FutureExt as _};
10use futures_util::ready;
11use futures_util::stream::{StreamExt as _, StreamFuture};
12use h2::{client::{Builder, Connection, SendRequest}, frame::Priority};
13use h2::SendStream;
14use http::{Method, StatusCode};
15use pin_project_lite::pin_project;
16
17use super::ping::{Ponger, Recorder};
18use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
19use crate::body::{Body, Incoming as IncomingBody};
20use crate::client::dispatch::{Callback, SendWhen, TrySendError};
21use crate::common::io::Compat;
22use crate::common::time::Time;
23use crate::ext::Protocol;
24use crate::headers;
25use crate::proto::h2::UpgradedSendStream;
26use crate::proto::Dispatched;
27use crate::rt::bounds::Http2ClientConnExec;
28use crate::upgrade::Upgraded;
29use crate::{Request, Response};
30use h2::client::ResponseFuture;
31use h2::frame::{PseudoOrder, SettingsOrder, StreamDependency};
32
33
34type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
35
36type ConnDropRef = mpsc::Sender<Infallible>;
39
40type ConnEof = oneshot::Receiver<Infallible>;
43
44const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
59
60#[derive(Clone, Debug)]
61pub(crate) struct Config {
62 pub(crate) adaptive_window: bool,
63 pub(crate) initial_stream_id: Option<u32>,
64 pub(crate) initial_conn_window_size: u32,
65 pub(crate) initial_stream_window_size: u32,
66 pub(crate) initial_max_send_streams: usize,
67 pub(crate) max_frame_size: Option<u32>,
68 pub(crate) keep_alive_interval: Option<Duration>,
69 pub(crate) keep_alive_timeout: Duration,
70 pub(crate) keep_alive_while_idle: bool,
71 pub(crate) max_concurrent_reset_streams: Option<usize>,
72 pub(crate) max_send_buffer_size: usize,
73 pub(crate) max_concurrent_streams: Option<u32>,
74 pub(crate) max_header_list_size: Option<u32>,
75 pub(crate) max_pending_accept_reset_streams: Option<usize>,
76 pub(crate) enable_push: Option<bool>,
77 pub(crate) header_table_size: Option<u32>,
78 pub(crate) unknown_setting8: Option<bool>,
79 pub(crate) unknown_setting9: Option<bool>,
80 pub(crate) headers_pseudo_order: Option<[PseudoOrder; 4]>,
81 pub(crate) headers_priority: Option<StreamDependency>,
82 pub(crate) settings_order: Option<[SettingsOrder; 8]>,
83 pub(crate) priority: Option<Cow<'static, [Priority]>>
84}
85
86impl Default for Config {
87 fn default() -> Config {
88 Config {
89 adaptive_window: false,
90 initial_stream_id: None,
91 initial_conn_window_size: DEFAULT_CONN_WINDOW,
92 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
93 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
94 max_frame_size: None,
95 max_header_list_size: None,
96 keep_alive_interval: None,
97 keep_alive_timeout: Duration::from_secs(20),
98 keep_alive_while_idle: false,
99 max_concurrent_reset_streams: None,
100 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
101 max_pending_accept_reset_streams: None,
102 header_table_size: None,
103 max_concurrent_streams: None,
104 enable_push: None,
105 unknown_setting8: None,
106 unknown_setting9: None,
107 headers_pseudo_order: None,
108 headers_priority: None,
109 settings_order: None,
110 priority: None,
111 }
112 }
113}
114
115fn new_builder(config: &Config) -> Builder {
116 let mut builder = Builder::default();
117 builder
118 .initial_max_send_streams(config.initial_max_send_streams)
119 .initial_window_size(config.initial_stream_window_size)
120 .initial_connection_window_size(config.initial_conn_window_size)
121 .max_send_buffer_size(config.max_send_buffer_size);
122 if let Some(id) = config.initial_stream_id {
123 builder.initial_stream_id(id);
124 }
125 if let Some(max) = config.max_pending_accept_reset_streams {
126 builder.max_pending_accept_reset_streams(max);
127 }
128 if let Some(max) = config.max_concurrent_reset_streams {
129 builder.max_concurrent_reset_streams(max);
130 }
131 if let Some(max) = config.max_concurrent_streams {
132 builder.max_concurrent_streams(max);
133 }
134 if let Some(max) = config.max_header_list_size {
135 builder.max_header_list_size(max);
136 }
137 if let Some(opt) = config.enable_push {
138 builder.enable_push(opt);
139 }
140 if let Some(max) = config.max_frame_size {
141 builder.max_frame_size(max);
142 }
143 if let Some(max) = config.header_table_size {
144 builder.header_table_size(max);
145 }
146 if let Some(v) = config.unknown_setting8 {
147 builder.unknown_setting8(v);
148 }
149 if let Some(v) = config.unknown_setting9 {
150 builder.unknown_setting9(v);
151 }
152 if let Some(priority) = config.headers_priority {
153 builder.headers_priority(priority);
154 }
155 if let Some(order) = config.headers_pseudo_order {
156 builder.headers_psuedo(order);
157 }
158 if let Some(order) = config.settings_order {
159 builder.settings_order(order);
160 }
161 if let Some(ref priority) = config.priority {
162 builder.priority(priority.clone());
163 }
164 builder
165}
166
167fn new_ping_config(config: &Config) -> ping::Config {
168 ping::Config {
169 bdp_initial_window: if config.adaptive_window {
170 Some(config.initial_stream_window_size)
171 } else {
172 None
173 },
174 keep_alive_interval: config.keep_alive_interval,
175 keep_alive_timeout: config.keep_alive_timeout,
176 keep_alive_while_idle: config.keep_alive_while_idle,
177 }
178}
179
180pub(crate) async fn handshake<T, B, E>(
181 io: T,
182 req_rx: ClientRx<B>,
183 config: &Config,
184 mut exec: E,
185 timer: Time,
186) -> crate::Result<ClientTask<B, E, T>>
187where
188 T: Read + Write + Unpin,
189 B: Body + 'static,
190 B::Data: Send + 'static,
191 E: Http2ClientConnExec<B, T> + Unpin,
192 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
193{
194 let (h2_tx, mut conn) = new_builder(config)
195 .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
196 .await
197 .map_err(crate::Error::new_h2)?;
198
199 let (conn_drop_ref, rx) = mpsc::channel(1);
204 let (cancel_tx, conn_eof) = oneshot::channel();
205
206 let conn_drop_rx = rx.into_future();
207
208 let ping_config = new_ping_config(config);
209
210 let (conn, ping) = if ping_config.is_enabled() {
211 let pp = conn.ping_pong().expect("conn.ping_pong");
212 let (recorder, ponger) = ping::channel(pp, ping_config, timer);
213
214 let conn: Conn<_, B> = Conn::new(ponger, conn);
215 (Either::Left(conn), recorder)
216 } else {
217 (Either::Right(conn), ping::disabled())
218 };
219 let conn: ConnMapErr<T, B> = ConnMapErr {
220 conn,
221 is_terminated: false,
222 };
223
224 exec.execute_h2_future(H2ClientFuture::Task {
225 task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
226 });
227
228 Ok(ClientTask {
229 ping,
230 conn_drop_ref,
231 conn_eof,
232 executor: exec,
233 h2_tx,
234 req_rx,
235 fut_ctx: None,
236 marker: PhantomData,
237 })
238}
239
240pin_project! {
241 struct Conn<T, B>
242 where
243 B: Body,
244 {
245 #[pin]
246 ponger: Ponger,
247 #[pin]
248 conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
249 }
250}
251
252impl<T, B> Conn<T, B>
253where
254 B: Body,
255 T: Read + Write + Unpin,
256{
257 fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
258 Conn { ponger, conn }
259 }
260}
261
262impl<T, B> Future for Conn<T, B>
263where
264 B: Body,
265 T: Read + Write + Unpin,
266{
267 type Output = Result<(), h2::Error>;
268
269 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
270 let mut this = self.project();
271 match this.ponger.poll(cx) {
272 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
273 this.conn.set_target_window_size(wnd);
274 this.conn.set_initial_window_size(wnd)?;
275 }
276 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
277 debug!("connection keep-alive timed out");
278 return Poll::Ready(Ok(()));
279 }
280 Poll::Pending => {}
281 }
282
283 Pin::new(&mut this.conn).poll(cx)
284 }
285}
286
287pin_project! {
288 struct ConnMapErr<T, B>
289 where
290 B: Body,
291 T: Read,
292 T: Write,
293 T: Unpin,
294 {
295 #[pin]
296 conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
297 #[pin]
298 is_terminated: bool,
299 }
300}
301
302impl<T, B> Future for ConnMapErr<T, B>
303where
304 B: Body,
305 T: Read + Write + Unpin,
306{
307 type Output = Result<(), ()>;
308
309 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
310 let mut this = self.project();
311
312 if *this.is_terminated {
313 return Poll::Pending;
314 }
315 let polled = this.conn.poll(cx);
316 if polled.is_ready() {
317 *this.is_terminated = true;
318 }
319 polled.map_err(|_e| {
320 debug!(error = %_e, "connection error");
321 })
322 }
323}
324
325impl<T, B> FusedFuture for ConnMapErr<T, B>
326where
327 B: Body,
328 T: Read + Write + Unpin,
329{
330 fn is_terminated(&self) -> bool {
331 self.is_terminated
332 }
333}
334
335pin_project! {
336 pub struct ConnTask<T, B>
337 where
338 B: Body,
339 T: Read,
340 T: Write,
341 T: Unpin,
342 {
343 #[pin]
344 drop_rx: StreamFuture<Receiver<Infallible>>,
345 #[pin]
346 cancel_tx: Option<oneshot::Sender<Infallible>>,
347 #[pin]
348 conn: ConnMapErr<T, B>,
349 }
350}
351
352impl<T, B> ConnTask<T, B>
353where
354 B: Body,
355 T: Read + Write + Unpin,
356{
357 fn new(
358 conn: ConnMapErr<T, B>,
359 drop_rx: StreamFuture<Receiver<Infallible>>,
360 cancel_tx: oneshot::Sender<Infallible>,
361 ) -> Self {
362 Self {
363 drop_rx,
364 cancel_tx: Some(cancel_tx),
365 conn,
366 }
367 }
368}
369
370impl<T, B> Future for ConnTask<T, B>
371where
372 B: Body,
373 T: Read + Write + Unpin,
374{
375 type Output = ();
376
377 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
378 let mut this = self.project();
379
380 if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
381 return Poll::Ready(());
383 }
384
385 if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
386 trace!("send_request dropped, starting conn shutdown");
390 drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
391 }
392
393 Poll::Pending
394 }
395}
396
397pin_project! {
398 #[project = H2ClientFutureProject]
399 pub enum H2ClientFuture<B, T>
400 where
401 B: http_body::Body,
402 B: 'static,
403 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
404 T: Read,
405 T: Write,
406 T: Unpin,
407 {
408 Pipe {
409 #[pin]
410 pipe: PipeMap<B>,
411 },
412 Send {
413 #[pin]
414 send_when: SendWhen<B>,
415 },
416 Task {
417 #[pin]
418 task: ConnTask<T, B>,
419 },
420 }
421}
422
423impl<B, T> Future for H2ClientFuture<B, T>
424where
425 B: http_body::Body + 'static,
426 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
427 T: Read + Write + Unpin,
428{
429 type Output = ();
430
431 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
432 let this = self.project();
433
434 match this {
435 H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
436 H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
437 H2ClientFutureProject::Task { task } => task.poll(cx),
438 }
439 }
440}
441
442struct FutCtx<B>
443where
444 B: Body,
445{
446 is_connect: bool,
447 eos: bool,
448 fut: ResponseFuture,
449 body_tx: SendStream<SendBuf<B::Data>>,
450 body: B,
451 cb: Callback<Request<B>, Response<IncomingBody>>,
452}
453
454impl<B: Body> Unpin for FutCtx<B> {}
455
456pub(crate) struct ClientTask<B, E, T>
457where
458 B: Body,
459 E: Unpin,
460{
461 ping: ping::Recorder,
462 conn_drop_ref: ConnDropRef,
463 conn_eof: ConnEof,
464 executor: E,
465 h2_tx: SendRequest<SendBuf<B::Data>>,
466 req_rx: ClientRx<B>,
467 fut_ctx: Option<FutCtx<B>>,
468 marker: PhantomData<T>,
469}
470
471impl<B, E, T> ClientTask<B, E, T>
472where
473 B: Body + 'static,
474 E: Http2ClientConnExec<B, T> + Unpin,
475 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
476 T: Read + Write + Unpin,
477{
478 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
479 self.h2_tx.is_extended_connect_protocol_enabled()
480 }
481}
482
483pin_project! {
484 pub struct PipeMap<S>
485 where
486 S: Body,
487 {
488 #[pin]
489 pipe: PipeToSendStream<S>,
490 #[pin]
491 conn_drop_ref: Option<Sender<Infallible>>,
492 #[pin]
493 ping: Option<Recorder>,
494 }
495}
496
497impl<B> Future for PipeMap<B>
498where
499 B: http_body::Body,
500 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
501{
502 type Output = ();
503
504 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
505 let mut this = self.project();
506
507 match this.pipe.poll_unpin(cx) {
508 Poll::Ready(result) => {
509 if let Err(_e) = result {
510 debug!("client request body error: {}", _e);
511 }
512 drop(this.conn_drop_ref.take().expect("Future polled twice"));
513 drop(this.ping.take().expect("Future polled twice"));
514 return Poll::Ready(());
515 }
516 Poll::Pending => (),
517 };
518 Poll::Pending
519 }
520}
521
522impl<B, E, T> ClientTask<B, E, T>
523where
524 B: Body + 'static + Unpin,
525 B::Data: Send,
526 E: Http2ClientConnExec<B, T> + Unpin,
527 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
528 T: Read + Write + Unpin,
529{
530 fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
531 let ping = self.ping.clone();
532
533 let send_stream = if !f.is_connect {
534 if !f.eos {
535 let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
536
537 match Pin::new(&mut pipe).poll(cx) {
540 Poll::Ready(_) => (),
541 Poll::Pending => {
542 let conn_drop_ref = self.conn_drop_ref.clone();
543 let ping = ping.clone();
547
548 let pipe = PipeMap {
549 pipe,
550 conn_drop_ref: Some(conn_drop_ref),
551 ping: Some(ping),
552 };
553 self.executor
555 .execute_h2_future(H2ClientFuture::Pipe { pipe });
556 }
557 }
558 }
559
560 None
561 } else {
562 Some(f.body_tx)
563 };
564
565 self.executor.execute_h2_future(H2ClientFuture::Send {
566 send_when: SendWhen {
567 when: ResponseFutMap {
568 fut: f.fut,
569 ping: Some(ping),
570 send_stream: Some(send_stream),
571 },
572 call_back: Some(f.cb),
573 },
574 });
575 }
576}
577
578pin_project! {
579 pub(crate) struct ResponseFutMap<B>
580 where
581 B: Body,
582 B: 'static,
583 {
584 #[pin]
585 fut: ResponseFuture,
586 #[pin]
587 ping: Option<Recorder>,
588 #[pin]
589 send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
590 }
591}
592
593impl<B> Future for ResponseFutMap<B>
594where
595 B: Body + 'static,
596{
597 type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
598
599 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
600 let mut this = self.project();
601
602 let result = ready!(this.fut.poll(cx));
603
604 let ping = this.ping.take().expect("Future polled twice");
605 let send_stream = this.send_stream.take().expect("Future polled twice");
606
607 match result {
608 Ok(res) => {
609 ping.record_non_data();
611
612 let content_length = headers::content_length_parse_all(res.headers());
613 if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
614 if content_length.map_or(false, |len| len != 0) {
615 warn!("h2 connect response with non-zero body not supported");
616
617 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
618 return Poll::Ready(Err((
619 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
620 None::<Request<B>>,
621 )));
622 }
623 let (parts, recv_stream) = res.into_parts();
624 let mut res = Response::from_parts(parts, IncomingBody::empty());
625
626 let (pending, on_upgrade) = crate::upgrade::pending();
627 let io = H2Upgraded {
628 ping,
629 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
630 recv_stream,
631 buf: Bytes::new(),
632 };
633 let upgraded = Upgraded::new(io, Bytes::new());
634
635 pending.fulfill(upgraded);
636 res.extensions_mut().insert(on_upgrade);
637
638 Poll::Ready(Ok(res))
639 } else {
640 let res = res.map(|stream| {
641 let ping = ping.for_stream(&stream);
642 IncomingBody::h2(stream, content_length.into(), ping)
643 });
644 Poll::Ready(Ok(res))
645 }
646 }
647 Err(err) => {
648 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
649
650 debug!("client response error: {}", err);
651 Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
652 }
653 }
654 }
655}
656
657impl<B, E, T> Future for ClientTask<B, E, T>
658where
659 B: Body + 'static + Unpin,
660 B::Data: Send,
661 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
662 E: Http2ClientConnExec<B, T> + Unpin,
663 T: Read + Write + Unpin,
664{
665 type Output = crate::Result<Dispatched>;
666
667 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
668 loop {
669 match ready!(self.h2_tx.poll_ready(cx)) {
670 Ok(()) => (),
671 Err(err) => {
672 self.ping.ensure_not_timed_out()?;
673 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
674 trace!("connection gracefully shutdown");
675 Poll::Ready(Ok(Dispatched::Shutdown))
676 } else {
677 Poll::Ready(Err(crate::Error::new_h2(err)))
678 };
679 }
680 };
681
682 if let Some(f) = self.fut_ctx.take() {
685 self.poll_pipe(f, cx);
686 continue;
687 }
688
689 match self.req_rx.poll_recv(cx) {
690 Poll::Ready(Some((req, cb))) => {
691 if cb.is_canceled() {
693 trace!("request callback is canceled");
694 continue;
695 }
696 let (head, body) = req.into_parts();
697 let mut req = ::http::Request::from_parts(head, ());
698 super::strip_connection_headers(req.headers_mut(), true);
699 if let Some(len) = body.size_hint().exact() {
700 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
701 headers::set_content_length_if_missing(req.headers_mut(), len);
702 }
703 }
704
705 let is_connect = req.method() == Method::CONNECT;
706 let eos = body.is_end_stream();
707
708 if is_connect
709 && headers::content_length_parse_all(req.headers())
710 .map_or(false, |len| len != 0)
711 {
712 warn!("h2 connect request with non-zero body not supported");
713 cb.send(Err(TrySendError {
714 error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
715 message: None,
716 }));
717 continue;
718 }
719
720 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
721 req.extensions_mut().insert(protocol.into_inner());
722 }
723
724 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
725 Ok(ok) => ok,
726 Err(err) => {
727 debug!("client send request error: {}", err);
728 cb.send(Err(TrySendError {
729 error: crate::Error::new_h2(err),
730 message: None,
731 }));
732 continue;
733 }
734 };
735
736 let f = FutCtx {
737 is_connect,
738 eos,
739 fut,
740 body_tx,
741 body,
742 cb,
743 };
744
745 match self.h2_tx.poll_ready(cx) {
749 Poll::Pending => {
750 self.fut_ctx = Some(f);
752 return Poll::Pending;
753 }
754 Poll::Ready(Ok(())) => (),
755 Poll::Ready(Err(err)) => {
756 f.cb.send(Err(TrySendError {
757 error: crate::Error::new_h2(err),
758 message: None,
759 }));
760 continue;
761 }
762 }
763 self.poll_pipe(f, cx);
764 continue;
765 }
766
767 Poll::Ready(None) => {
768 trace!("client::dispatch::Sender dropped");
769 return Poll::Ready(Ok(Dispatched::Shutdown));
770 }
771
772 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
773 #[allow(unused)]
776 Ok(never) => match never {},
777 Err(_conn_is_eof) => {
778 trace!("connection task is closed, closing dispatch task");
779 return Poll::Ready(Ok(Dispatched::Shutdown));
780 }
781 },
782 }
783 }
784 }
785}