pub(crate) mod client;
pub(crate) mod ping;
use std::{
future::Future,
io::{self, Cursor, IoSlice},
pin::Pin,
task::{ready, Context, Poll},
time::Duration,
};
use bytes::{Buf, Bytes};
use http::{
header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE},
HeaderMap,
};
pub use http2::frame::{
Priorities, PrioritiesBuilder, Priority, PseudoId, PseudoOrder, Setting, SettingId,
SettingsOrder, SettingsOrderBuilder, StreamDependency, StreamId,
};
use http2::{Reason, RecvStream, SendStream};
use http_body::Body;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::{error::BoxError, Error, Result};
const SPEC_WINDOW_SIZE: u32 = 65_535;
const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16;
const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
static CONNECTION_HEADERS: [HeaderName; 4] = [
HeaderName::from_static("keep-alive"),
HeaderName::from_static("proxy-connection"),
TRANSFER_ENCODING,
UPGRADE,
];
fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
for header in &CONNECTION_HEADERS {
if headers.remove(header).is_some() {
warn!("Connection header illegal in HTTP/2: {}", header.as_str());
}
}
if is_request {
if headers
.get(TE)
.is_some_and(|te_header| te_header != "trailers")
{
warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
headers.remove(TE);
}
} else if headers.remove(TE).is_some() {
warn!("TE headers illegal in HTTP/2 responses");
}
if let Some(header) = headers.remove(CONNECTION) {
warn!(
"Connection header illegal in HTTP/2: {}",
CONNECTION.as_str()
);
if let Ok(header_contents) = header.to_str() {
for name in header_contents.split(',') {
let name = name.trim();
headers.remove(name);
}
}
}
}
struct Peeked<D> {
data: D,
is_eos: bool,
}
pin_project! {
pub(crate) struct PipeToSendStream<S>
where
S: Body,
{
#[pin]
stream: S,
body_tx: SendStream<SendBuf<S::Data>>,
data_done: bool,
buffered_data: Option<Peeked<S::Data>>,
}
}
impl<S> PipeToSendStream<S>
where
S: Body,
{
#[inline]
fn new(stream: S, body_tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
PipeToSendStream {
stream,
body_tx,
data_done: false,
buffered_data: None,
}
}
#[inline]
fn send_reset(self: Pin<&mut Self>, reason: http2::Reason) {
self.project().body_tx.send_reset(reason);
}
}
impl<S> Future for PipeToSendStream<S>
where
S: Body,
S::Error: Into<BoxError>,
{
type Output = Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();
loop {
if let Poll::Ready(reason) = me
.body_tx
.poll_reset(cx)
.map_err(crate::Error::new_body_write)?
{
debug!("stream received RST_STREAM: {:?}", reason);
return Poll::Ready(Err(Error::new_body_write(::http2::Error::from(reason))));
}
if me.buffered_data.is_some() {
while me.body_tx.capacity() == 0 {
match ready!(me.body_tx.poll_capacity(cx)) {
Some(Ok(0)) => {}
Some(Ok(_)) => break,
Some(Err(e)) => {
return Poll::Ready(Err(Error::new_body_write(e)));
}
None => {
return Poll::Ready(Err(Error::new_body_write(
"send stream capacity unexpectedly closed",
)));
}
}
}
let peeked = me.buffered_data.take().expect("checked is_some above");
let buf = SendBuf::Buf(peeked.data);
me.body_tx
.send_data(buf, peeked.is_eos)
.map_err(Error::new_body_write)?;
if peeked.is_eos {
return Poll::Ready(Ok(()));
}
continue;
}
match ready!(me.stream.as_mut().poll_frame(cx)) {
Some(Ok(frame)) => {
if frame.is_data() {
let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
let is_eos = me.stream.is_end_stream();
let len = chunk.remaining();
trace!("send body chunk: {} bytes, eos={}", len, is_eos);
if len == 0 {
let buf = SendBuf::Buf(chunk);
me.body_tx
.send_data(buf, is_eos)
.map_err(Error::new_body_write)?;
if is_eos {
return Poll::Ready(Ok(()));
}
continue;
}
me.body_tx.reserve_capacity(len);
*me.buffered_data = Some(Peeked {
data: chunk,
is_eos,
});
} else if frame.is_trailers() {
me.body_tx.reserve_capacity(0);
me.body_tx
.send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
.map_err(Error::new_body_write)?;
return Poll::Ready(Ok(()));
} else {
trace!("discarding unknown frame");
}
}
Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
None => {
return Poll::Ready(me.body_tx.send_eos_frame());
}
}
}
}
}
trait SendStreamExt {
fn on_user_err<E>(&mut self, err: E) -> Error
where
E: Into<BoxError>;
fn send_eos_frame(&mut self) -> Result<()>;
}
impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
fn on_user_err<E>(&mut self, err: E) -> Error
where
E: Into<BoxError>,
{
let err = Error::new_user_body(err);
debug!("send body user stream error: {}", err);
self.send_reset(err.h2_reason());
err
}
fn send_eos_frame(&mut self) -> Result<()> {
trace!("send body eos");
self.send_data(SendBuf::None, true)
.map_err(Error::new_body_write)
}
}
#[repr(usize)]
enum SendBuf<B> {
Buf(B),
Cursor(Cursor<Box<[u8]>>),
None,
}
impl<B: Buf> Buf for SendBuf<B> {
#[inline]
fn remaining(&self) -> usize {
match *self {
Self::Buf(ref b) => b.remaining(),
Self::Cursor(ref c) => Buf::remaining(c),
Self::None => 0,
}
}
#[inline]
fn chunk(&self) -> &[u8] {
match *self {
Self::Buf(ref b) => b.chunk(),
Self::Cursor(ref c) => c.chunk(),
Self::None => &[],
}
}
#[inline]
fn advance(&mut self, cnt: usize) {
match *self {
Self::Buf(ref mut b) => b.advance(cnt),
Self::Cursor(ref mut c) => c.advance(cnt),
Self::None => {}
}
}
fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
match *self {
Self::Buf(ref b) => b.chunks_vectored(dst),
Self::Cursor(ref c) => c.chunks_vectored(dst),
Self::None => 0,
}
}
}
struct H2Upgraded<B>
where
B: Buf,
{
ping: ping::Recorder,
send_stream: SendStream<SendBuf<B>>,
recv_stream: RecvStream,
buf: Bytes,
}
impl<B> AsyncRead for H2Upgraded<B>
where
B: Buf,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
read_buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.buf.is_empty() {
self.buf = loop {
match ready!(self.recv_stream.poll_data(cx)) {
None => return Poll::Ready(Ok(())),
Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
continue;
}
Some(Ok(buf)) => {
self.ping.record_data(buf.len());
break buf;
}
Some(Err(e)) => {
return Poll::Ready(match e.reason() {
Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
Some(Reason::STREAM_CLOSED) => {
Err(io::Error::new(io::ErrorKind::BrokenPipe, e))
}
_ => Err(h2_to_io_error(e)),
});
}
}
};
}
let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
read_buf.put_slice(&self.buf[..cnt]);
self.buf.advance(cnt);
let _ = self.recv_stream.flow_control().release_capacity(cnt);
Poll::Ready(Ok(()))
}
}
impl<B> AsyncWrite for H2Upgraded<B>
where
B: Buf,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
self.send_stream.reserve_capacity(buf.len());
let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
None => Some(0),
Some(Ok(cnt)) => self
.send_stream
.send_data(SendBuf::Cursor(Cursor::new(buf[..cnt].into())), false)
.ok()
.map(|()| cnt),
Some(Err(_)) => None,
};
if let Some(cnt) = cnt {
return Poll::Ready(Ok(cnt));
}
Poll::Ready(Err(h2_to_io_error(
match ready!(self.send_stream.poll_reset(cx)) {
Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}
Ok(reason) => reason.into(),
Err(e) => e,
},
)))
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self
.send_stream
.send_data(SendBuf::Cursor(Cursor::new([].into())), true)
.is_ok()
{
return Poll::Ready(Ok(()));
}
Poll::Ready(Err(h2_to_io_error(
match ready!(self.send_stream.poll_reset(cx)) {
Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}
Ok(reason) => reason.into(),
Err(e) => e,
},
)))
}
}
fn h2_to_io_error(e: http2::Error) -> std::io::Error {
if e.is_io() {
e.into_io()
.expect("[BUG] http2::Error::is_io() is true, but into_io() failed")
} else {
std::io::Error::other(e)
}
}
#[must_use]
#[derive(Debug)]
pub struct Http2OptionsBuilder {
opts: Http2Options,
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct Http2Options {
pub adaptive_window: bool,
pub initial_stream_id: Option<u32>,
pub initial_conn_window_size: u32,
pub initial_window_size: u32,
pub initial_max_send_streams: usize,
pub max_frame_size: Option<u32>,
pub keep_alive_interval: Option<Duration>,
pub keep_alive_timeout: Duration,
pub keep_alive_while_idle: bool,
pub max_concurrent_reset_streams: Option<usize>,
pub max_send_buffer_size: usize,
pub max_concurrent_streams: Option<u32>,
pub max_header_list_size: Option<u32>,
pub max_pending_accept_reset_streams: Option<usize>,
pub enable_push: Option<bool>,
pub header_table_size: Option<u32>,
pub enable_connect_protocol: Option<bool>,
pub no_rfc7540_priorities: Option<bool>,
pub headers_pseudo_order: Option<PseudoOrder>,
pub headers_stream_dependency: Option<StreamDependency>,
pub settings_order: Option<SettingsOrder>,
pub priorities: Option<Priorities>,
}
impl Http2OptionsBuilder {
#[inline]
pub fn initial_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
if let Some(sz) = sz.into() {
self.opts.adaptive_window = false;
self.opts.initial_window_size = sz;
}
self
}
#[inline]
pub fn initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
if let Some(sz) = sz.into() {
self.opts.adaptive_window = false;
self.opts.initial_conn_window_size = sz;
}
self
}
#[inline]
pub fn initial_max_send_streams(mut self, initial: impl Into<Option<usize>>) -> Self {
if let Some(initial) = initial.into() {
self.opts.initial_max_send_streams = initial;
}
self
}
#[inline]
pub fn initial_stream_id(mut self, id: impl Into<Option<u32>>) -> Self {
if let Some(id) = id.into() {
self.opts.initial_stream_id = Some(id);
}
self
}
#[inline]
pub fn adaptive_window(mut self, enabled: bool) -> Self {
self.opts.adaptive_window = enabled;
if enabled {
self.opts.initial_window_size = SPEC_WINDOW_SIZE;
self.opts.initial_conn_window_size = SPEC_WINDOW_SIZE;
}
self
}
#[inline]
pub fn max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
if let Some(sz) = sz.into() {
self.opts.max_frame_size = Some(sz);
}
self
}
#[inline]
pub fn max_header_list_size(mut self, max: u32) -> Self {
self.opts.max_header_list_size = Some(max);
self
}
#[inline]
pub fn header_table_size(mut self, size: impl Into<Option<u32>>) -> Self {
if let Some(size) = size.into() {
self.opts.header_table_size = Some(size);
}
self
}
#[inline]
pub fn max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
if let Some(max) = max.into() {
self.opts.max_concurrent_streams = Some(max);
}
self
}
#[inline]
pub fn keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
self.opts.keep_alive_interval = interval.into();
self
}
#[inline]
pub fn keep_alive_timeout(mut self, timeout: Duration) -> Self {
self.opts.keep_alive_timeout = timeout;
self
}
#[inline]
pub fn keep_alive_while_idle(mut self, enabled: bool) -> Self {
self.opts.keep_alive_while_idle = enabled;
self
}
#[inline]
pub fn enable_push(mut self, opt: bool) -> Self {
self.opts.enable_push = Some(opt);
self
}
#[inline]
pub fn enable_connect_protocol(mut self, opt: bool) -> Self {
self.opts.enable_connect_protocol = Some(opt);
self
}
#[inline]
pub fn no_rfc7540_priorities(mut self, opt: bool) -> Self {
self.opts.no_rfc7540_priorities = Some(opt);
self
}
#[inline]
pub fn max_concurrent_reset_streams(mut self, max: usize) -> Self {
self.opts.max_concurrent_reset_streams = Some(max);
self
}
#[inline]
pub fn max_send_buf_size(mut self, max: usize) -> Self {
assert!(max <= u32::MAX as usize);
self.opts.max_send_buffer_size = max;
self
}
#[inline]
pub fn max_pending_accept_reset_streams(mut self, max: impl Into<Option<usize>>) -> Self {
if let Some(max) = max.into() {
self.opts.max_pending_accept_reset_streams = Some(max);
}
self
}
#[inline]
pub fn headers_stream_dependency<T>(mut self, stream_dependency: T) -> Self
where
T: Into<Option<StreamDependency>>,
{
if let Some(stream_dependency) = stream_dependency.into() {
self.opts.headers_stream_dependency = Some(stream_dependency);
}
self
}
#[inline]
pub fn headers_pseudo_order<T>(mut self, headers_pseudo_order: T) -> Self
where
T: Into<Option<PseudoOrder>>,
{
if let Some(headers_pseudo_order) = headers_pseudo_order.into() {
self.opts.headers_pseudo_order = Some(headers_pseudo_order);
}
self
}
#[inline]
pub fn settings_order<T>(mut self, settings_order: T) -> Self
where
T: Into<Option<SettingsOrder>>,
{
if let Some(settings_order) = settings_order.into() {
self.opts.settings_order = Some(settings_order);
}
self
}
#[inline]
pub fn priorities<T>(mut self, priorities: T) -> Self
where
T: Into<Option<Priorities>>,
{
if let Some(priorities) = priorities.into() {
self.opts.priorities = Some(priorities);
}
self
}
#[inline]
pub fn build(self) -> Http2Options {
self.opts
}
}
impl Http2Options {
pub fn builder() -> Http2OptionsBuilder {
Http2OptionsBuilder {
opts: Http2Options {
max_frame_size: None,
max_header_list_size: None,
..Default::default()
},
}
}
}
impl Default for Http2Options {
#[inline]
fn default() -> Self {
Http2Options {
adaptive_window: false,
initial_stream_id: None,
initial_conn_window_size: DEFAULT_CONN_WINDOW,
initial_window_size: DEFAULT_STREAM_WINDOW,
initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
max_header_list_size: Some(DEFAULT_MAX_HEADER_LIST_SIZE),
keep_alive_interval: None,
keep_alive_timeout: Duration::from_secs(20),
keep_alive_while_idle: false,
max_concurrent_reset_streams: None,
max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
max_pending_accept_reset_streams: None,
header_table_size: None,
max_concurrent_streams: None,
enable_push: None,
enable_connect_protocol: None,
no_rfc7540_priorities: None,
settings_order: None,
headers_pseudo_order: None,
headers_stream_dependency: None,
priorities: None,
}
}
}