1pub(crate) mod client;
4pub(crate) mod ping;
5
6use std::{
7 future::Future,
8 io::{self, Cursor, IoSlice},
9 pin::Pin,
10 task::{ready, Context, Poll},
11 time::Duration,
12};
13
14use bytes::{Buf, Bytes};
15use http::{
16 header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE},
17 HeaderMap,
18};
19pub use http2::frame::{
20 Priorities, PrioritiesBuilder, Priority, PseudoId, PseudoOrder, Setting, SettingId,
21 SettingsOrder, SettingsOrderBuilder, StreamDependency, StreamId,
22};
23use http2::{Reason, RecvStream, SendStream};
24use http_body::Body;
25use pin_project_lite::pin_project;
26use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
27
28use crate::{error::BoxError, Error, Result};
29
30const SPEC_WINDOW_SIZE: u32 = 65_535;
32
33const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
50
51static CONNECTION_HEADERS: [HeaderName; 4] = [
56 HeaderName::from_static("keep-alive"),
57 HeaderName::from_static("proxy-connection"),
58 TRANSFER_ENCODING,
59 UPGRADE,
60];
61
62fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
63 for header in &CONNECTION_HEADERS {
64 if headers.remove(header).is_some() {
65 warn!("Connection header illegal in HTTP/2: {}", header.as_str());
66 }
67 }
68
69 if is_request {
70 if headers
71 .get(TE)
72 .is_some_and(|te_header| te_header != "trailers")
73 {
74 warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
75 headers.remove(TE);
76 }
77 } else if headers.remove(TE).is_some() {
78 warn!("TE headers illegal in HTTP/2 responses");
79 }
80
81 if let Some(header) = headers.remove(CONNECTION) {
82 warn!(
83 "Connection header illegal in HTTP/2: {}",
84 CONNECTION.as_str()
85 );
86
87 if let Ok(header_contents) = header.to_str() {
88 for name in header_contents.split(',') {
95 let name = name.trim();
96 headers.remove(name);
97 }
98 }
99 }
100}
101
102pin_project! {
104 pub(crate) struct PipeToSendStream<S>
105 where
106 S: Body,
107 {
108 #[pin]
109 stream: S,
110 body_tx: SendStream<SendBuf<S::Data>>,
111 data_done: bool,
112 }
113}
114
115impl<S> PipeToSendStream<S>
116where
117 S: Body,
118{
119 #[inline]
120 fn new(stream: S, body_tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
121 PipeToSendStream {
122 stream,
123 body_tx,
124 data_done: false,
125 }
126 }
127
128 #[inline]
129 fn send_reset(self: Pin<&mut Self>, reason: http2::Reason) {
130 self.project().body_tx.send_reset(reason);
131 }
132}
133
134impl<S> Future for PipeToSendStream<S>
135where
136 S: Body,
137 S::Error: Into<BoxError>,
138{
139 type Output = Result<()>;
140
141 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142 let mut me = self.project();
143 loop {
144 me.body_tx.reserve_capacity(1);
148
149 if me.body_tx.capacity() == 0 {
150 loop {
151 match ready!(me.body_tx.poll_capacity(cx)) {
152 Some(Ok(0)) => {}
153 Some(Ok(_)) => break,
154 Some(Err(e)) => {
155 return Poll::Ready(Err(Error::new_body_write(e)));
156 }
157 None => {
158 return Poll::Ready(Err(Error::new_body_write(
162 "send stream capacity unexpectedly closed",
163 )));
164 }
165 }
166 }
167 } else if let Poll::Ready(reason) =
168 me.body_tx.poll_reset(cx).map_err(Error::new_body_write)?
169 {
170 debug!("stream received RST_STREAM: {:?}", reason);
171 return Poll::Ready(Err(Error::new_body_write(::http2::Error::from(reason))));
172 }
173
174 match ready!(me.stream.as_mut().poll_frame(cx)) {
175 Some(Ok(frame)) => {
176 if frame.is_data() {
177 let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
178 let is_eos = me.stream.is_end_stream();
179 trace!(
180 "send body chunk: {} bytes, eos={}",
181 chunk.remaining(),
182 is_eos,
183 );
184
185 let buf = SendBuf::Buf(chunk);
186 me.body_tx
187 .send_data(buf, is_eos)
188 .map_err(Error::new_body_write)?;
189
190 if is_eos {
191 return Poll::Ready(Ok(()));
192 }
193 } else if frame.is_trailers() {
194 me.body_tx.reserve_capacity(0);
196 me.body_tx
197 .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
198 .map_err(Error::new_body_write)?;
199 return Poll::Ready(Ok(()));
200 } else {
201 trace!("discarding unknown frame");
202 }
204 }
205 Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
206 None => {
207 return Poll::Ready(me.body_tx.send_eos_frame());
211 }
212 }
213 }
214 }
215}
216
217trait SendStreamExt {
218 fn on_user_err<E>(&mut self, err: E) -> Error
219 where
220 E: Into<BoxError>;
221
222 fn send_eos_frame(&mut self) -> Result<()>;
223}
224
225impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
226 fn on_user_err<E>(&mut self, err: E) -> Error
227 where
228 E: Into<BoxError>,
229 {
230 let err = Error::new_user_body(err);
231 debug!("send body user stream error: {}", err);
232 self.send_reset(err.h2_reason());
233 err
234 }
235
236 fn send_eos_frame(&mut self) -> Result<()> {
237 trace!("send body eos");
238 self.send_data(SendBuf::None, true)
239 .map_err(Error::new_body_write)
240 }
241}
242
243#[repr(usize)]
244enum SendBuf<B> {
245 Buf(B),
246 Cursor(Cursor<Box<[u8]>>),
247 None,
248}
249
250impl<B: Buf> Buf for SendBuf<B> {
251 #[inline]
252 fn remaining(&self) -> usize {
253 match *self {
254 Self::Buf(ref b) => b.remaining(),
255 Self::Cursor(ref c) => Buf::remaining(c),
256 Self::None => 0,
257 }
258 }
259
260 #[inline]
261 fn chunk(&self) -> &[u8] {
262 match *self {
263 Self::Buf(ref b) => b.chunk(),
264 Self::Cursor(ref c) => c.chunk(),
265 Self::None => &[],
266 }
267 }
268
269 #[inline]
270 fn advance(&mut self, cnt: usize) {
271 match *self {
272 Self::Buf(ref mut b) => b.advance(cnt),
273 Self::Cursor(ref mut c) => c.advance(cnt),
274 Self::None => {}
275 }
276 }
277
278 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
279 match *self {
280 Self::Buf(ref b) => b.chunks_vectored(dst),
281 Self::Cursor(ref c) => c.chunks_vectored(dst),
282 Self::None => 0,
283 }
284 }
285}
286
287struct H2Upgraded<B>
288where
289 B: Buf,
290{
291 ping: ping::Recorder,
292 send_stream: SendStream<SendBuf<B>>,
293 recv_stream: RecvStream,
294 buf: Bytes,
295}
296
297impl<B> AsyncRead for H2Upgraded<B>
298where
299 B: Buf,
300{
301 fn poll_read(
302 mut self: Pin<&mut Self>,
303 cx: &mut Context<'_>,
304 read_buf: &mut ReadBuf<'_>,
305 ) -> Poll<io::Result<()>> {
306 if self.buf.is_empty() {
307 self.buf = loop {
308 match ready!(self.recv_stream.poll_data(cx)) {
309 None => return Poll::Ready(Ok(())),
310 Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
311 continue;
312 }
313 Some(Ok(buf)) => {
314 self.ping.record_data(buf.len());
315 break buf;
316 }
317 Some(Err(e)) => {
318 return Poll::Ready(match e.reason() {
319 Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
320 Some(Reason::STREAM_CLOSED) => {
321 Err(io::Error::new(io::ErrorKind::BrokenPipe, e))
322 }
323 _ => Err(h2_to_io_error(e)),
324 });
325 }
326 }
327 };
328 }
329 let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
330 read_buf.put_slice(&self.buf[..cnt]);
331 self.buf.advance(cnt);
332 let _ = self.recv_stream.flow_control().release_capacity(cnt);
333 Poll::Ready(Ok(()))
334 }
335}
336
337impl<B> AsyncWrite for H2Upgraded<B>
338where
339 B: Buf,
340{
341 fn poll_write(
342 mut self: Pin<&mut Self>,
343 cx: &mut Context<'_>,
344 buf: &[u8],
345 ) -> Poll<io::Result<usize>> {
346 if buf.is_empty() {
347 return Poll::Ready(Ok(0));
348 }
349 self.send_stream.reserve_capacity(buf.len());
350
351 let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
354 None => Some(0),
355 Some(Ok(cnt)) => self
356 .send_stream
357 .send_data(SendBuf::Cursor(Cursor::new(buf[..cnt].into())), false)
358 .ok()
359 .map(|()| cnt),
360 Some(Err(_)) => None,
361 };
362
363 if let Some(cnt) = cnt {
364 return Poll::Ready(Ok(cnt));
365 }
366
367 Poll::Ready(Err(h2_to_io_error(
368 match ready!(self.send_stream.poll_reset(cx)) {
369 Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
370 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
371 }
372 Ok(reason) => reason.into(),
373 Err(e) => e,
374 },
375 )))
376 }
377
378 #[inline]
379 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
380 Poll::Ready(Ok(()))
381 }
382
383 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
384 if self
385 .send_stream
386 .send_data(SendBuf::Cursor(Cursor::new([].into())), true)
387 .is_ok()
388 {
389 return Poll::Ready(Ok(()));
390 }
391
392 Poll::Ready(Err(h2_to_io_error(
393 match ready!(self.send_stream.poll_reset(cx)) {
394 Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
395 Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
396 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
397 }
398 Ok(reason) => reason.into(),
399 Err(e) => e,
400 },
401 )))
402 }
403}
404
405fn h2_to_io_error(e: http2::Error) -> std::io::Error {
406 if e.is_io() {
407 e.into_io()
408 .expect("[BUG] http2::Error::is_io() is true, but into_io() failed")
409 } else {
410 std::io::Error::other(e)
411 }
412}
413
414#[must_use]
416#[derive(Debug)]
417pub struct Http2OptionsBuilder {
418 opts: Http2Options,
419}
420
421#[non_exhaustive]
427#[derive(Debug, Clone)]
428pub struct Http2Options {
429 pub adaptive_window: bool,
431
432 pub initial_stream_id: Option<u32>,
434
435 pub initial_conn_window_size: u32,
437
438 pub initial_window_size: u32,
440
441 pub initial_max_send_streams: usize,
443
444 pub max_frame_size: Option<u32>,
446
447 pub keep_alive_interval: Option<Duration>,
449
450 pub keep_alive_timeout: Duration,
452
453 pub keep_alive_while_idle: bool,
455
456 pub max_concurrent_reset_streams: Option<usize>,
458
459 pub max_send_buffer_size: usize,
461
462 pub max_concurrent_streams: Option<u32>,
464
465 pub max_header_list_size: Option<u32>,
467
468 pub max_pending_accept_reset_streams: Option<usize>,
470
471 pub enable_push: Option<bool>,
473
474 pub header_table_size: Option<u32>,
476
477 pub enable_connect_protocol: Option<bool>,
479
480 pub no_rfc7540_priorities: Option<bool>,
482
483 pub headers_pseudo_order: Option<PseudoOrder>,
485
486 pub headers_stream_dependency: Option<StreamDependency>,
488
489 pub settings_order: Option<SettingsOrder>,
491
492 pub priorities: Option<Priorities>,
494}
495
496impl Http2OptionsBuilder {
497 #[inline]
506 pub fn initial_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
507 if let Some(sz) = sz.into() {
508 self.opts.adaptive_window = false;
509 self.opts.initial_window_size = sz;
510 }
511 self
512 }
513
514 #[inline]
520 pub fn initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
521 if let Some(sz) = sz.into() {
522 self.opts.adaptive_window = false;
523 self.opts.initial_conn_window_size = sz;
524 }
525 self
526 }
527
528 #[inline]
539 pub fn initial_max_send_streams(mut self, initial: impl Into<Option<usize>>) -> Self {
540 if let Some(initial) = initial.into() {
541 self.opts.initial_max_send_streams = initial;
542 }
543 self
544 }
545
546 #[inline]
548 pub fn initial_stream_id(mut self, id: impl Into<Option<u32>>) -> Self {
549 if let Some(id) = id.into() {
550 self.opts.initial_stream_id = Some(id);
551 }
552 self
553 }
554
555 #[inline]
561 pub fn adaptive_window(mut self, enabled: bool) -> Self {
562 self.opts.adaptive_window = enabled;
563 if enabled {
564 self.opts.initial_window_size = SPEC_WINDOW_SIZE;
565 self.opts.initial_conn_window_size = SPEC_WINDOW_SIZE;
566 }
567 self
568 }
569
570 #[inline]
574 pub fn max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
575 if let Some(sz) = sz.into() {
576 self.opts.max_frame_size = Some(sz);
577 }
578 self
579 }
580
581 #[inline]
585 pub fn max_header_list_size(mut self, max: u32) -> Self {
586 self.opts.max_header_list_size = Some(max);
587 self
588 }
589
590 #[inline]
598 pub fn header_table_size(mut self, size: impl Into<Option<u32>>) -> Self {
599 if let Some(size) = size.into() {
600 self.opts.header_table_size = Some(size);
601 }
602 self
603 }
604
605 #[inline]
629 pub fn max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
630 if let Some(max) = max.into() {
631 self.opts.max_concurrent_streams = Some(max);
632 }
633 self
634 }
635
636 #[inline]
643 pub fn keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
644 self.opts.keep_alive_interval = interval.into();
645 self
646 }
647
648 #[inline]
655 pub fn keep_alive_timeout(mut self, timeout: Duration) -> Self {
656 self.opts.keep_alive_timeout = timeout;
657 self
658 }
659
660 #[inline]
669 pub fn keep_alive_while_idle(mut self, enabled: bool) -> Self {
670 self.opts.keep_alive_while_idle = enabled;
671 self
672 }
673
674 #[inline]
678 pub fn enable_push(mut self, opt: bool) -> Self {
679 self.opts.enable_push = Some(opt);
680 self
681 }
682
683 #[inline]
685 pub fn enable_connect_protocol(mut self, opt: bool) -> Self {
686 self.opts.enable_connect_protocol = Some(opt);
687 self
688 }
689
690 #[inline]
693 pub fn no_rfc7540_priorities(mut self, opt: bool) -> Self {
694 self.opts.no_rfc7540_priorities = Some(opt);
695 self
696 }
697
698 #[inline]
707 pub fn max_concurrent_reset_streams(mut self, max: usize) -> Self {
708 self.opts.max_concurrent_reset_streams = Some(max);
709 self
710 }
711
712 #[inline]
720 pub fn max_send_buf_size(mut self, max: usize) -> Self {
721 assert!(max <= u32::MAX as usize);
722 self.opts.max_send_buffer_size = max;
723 self
724 }
725
726 #[inline]
730 pub fn max_pending_accept_reset_streams(mut self, max: impl Into<Option<usize>>) -> Self {
731 if let Some(max) = max.into() {
732 self.opts.max_pending_accept_reset_streams = Some(max);
733 }
734 self
735 }
736
737 #[inline]
743 pub fn headers_stream_dependency<T>(mut self, stream_dependency: T) -> Self
744 where
745 T: Into<Option<StreamDependency>>,
746 {
747 if let Some(stream_dependency) = stream_dependency.into() {
748 self.opts.headers_stream_dependency = Some(stream_dependency);
749 }
750 self
751 }
752
753 #[inline]
759 pub fn headers_pseudo_order<T>(mut self, headers_pseudo_order: T) -> Self
760 where
761 T: Into<Option<PseudoOrder>>,
762 {
763 if let Some(headers_pseudo_order) = headers_pseudo_order.into() {
764 self.opts.headers_pseudo_order = Some(headers_pseudo_order);
765 }
766 self
767 }
768
769 #[inline]
774 pub fn settings_order<T>(mut self, settings_order: T) -> Self
775 where
776 T: Into<Option<SettingsOrder>>,
777 {
778 if let Some(settings_order) = settings_order.into() {
779 self.opts.settings_order = Some(settings_order);
780 }
781 self
782 }
783
784 #[inline]
794 pub fn priorities<T>(mut self, priorities: T) -> Self
795 where
796 T: Into<Option<Priorities>>,
797 {
798 if let Some(priorities) = priorities.into() {
799 self.opts.priorities = Some(priorities);
800 }
801 self
802 }
803
804 #[inline]
806 pub fn build(self) -> Http2Options {
807 self.opts
808 }
809}
810
811impl Http2Options {
812 pub fn builder() -> Http2OptionsBuilder {
814 Http2OptionsBuilder {
818 opts: Http2Options {
819 max_frame_size: None,
820 max_header_list_size: None,
821 ..Default::default()
822 },
823 }
824 }
825}
826
827impl Default for Http2Options {
828 #[inline]
829 fn default() -> Self {
830 Http2Options {
831 adaptive_window: false,
832 initial_stream_id: None,
833 initial_conn_window_size: DEFAULT_CONN_WINDOW,
834 initial_window_size: DEFAULT_STREAM_WINDOW,
835 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
836 max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
837 max_header_list_size: Some(DEFAULT_MAX_HEADER_LIST_SIZE),
838 keep_alive_interval: None,
839 keep_alive_timeout: Duration::from_secs(20),
840 keep_alive_while_idle: false,
841 max_concurrent_reset_streams: None,
842 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
843 max_pending_accept_reset_streams: None,
844 header_table_size: None,
845 max_concurrent_streams: None,
846 enable_push: None,
847 enable_connect_protocol: None,
848 no_rfc7540_priorities: None,
849 settings_order: None,
850 headers_pseudo_order: None,
851 headers_stream_dependency: None,
852 priorities: None,
853 }
854 }
855}