#[macro_use]
mod macros;
mod client_stream;
mod common_flags;
mod common_stream;
mod continuation_frame;
mod data_frame;
mod frame_init;
mod frame_reader;
mod go_away_frame;
mod headers_frame;
mod hpack_decoder;
mod hpack_encoder;
mod hpack_header;
mod hpack_headers;
mod hpack_static_headers;
mod http2_buffer;
mod http2_data;
mod http2_error;
mod http2_error_code;
mod http2_status;
mod http_send_params;
mod huffman;
mod huffman_tables;
mod initial_server_stream_remote;
mod misc;
mod ping_frame;
mod process_receipt_frame_ty;
mod reader_data;
mod reset_stream_frame;
mod server_stream;
mod settings_frame;
mod stream_receiver;
mod stream_state;
#[cfg(all(feature = "_async-tests", test))]
mod tests;
#[cfg(feature = "web-socket")]
mod web_socket_over_stream;
mod window;
mod window_update_frame;
mod write_functions;
mod writer_data;
use crate::{
http::{DEFAULT_INITIAL_WINDOW_LEN, HttpRecvParams, Protocol, ReqResBuffer, Request, u31::U31},
http2::settings_frame::SettingsFrame,
misc::{ConnectionState, Lease, LeaseMut, SingleTypeStorage, Usize},
stream::{StreamReader, StreamWriter},
sync::{Arc, AsyncMutex, AtomicBool, AtomicWaker},
};
pub use client_stream::ClientStream;
pub use common_stream::CommonStream;
use core::{
future::poll_fn,
mem,
pin::pin,
task::{Poll, Waker},
};
use hashbrown::HashMap;
pub use http2_buffer::Http2Buffer;
pub use http2_data::Http2Data;
pub use http2_error::Http2Error;
pub use http2_error_code::Http2ErrorCode;
pub use http2_status::{Http2RecvStatus, Http2SendStatus};
pub use server_stream::ServerStream;
#[cfg(feature = "web-socket")]
pub use web_socket_over_stream::WebSocketOverStream;
pub use window::{Window, Windows};
const PREFACE: &[u8; 24] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
pub(crate) type Scrp = HashMap<U31, stream_receiver::StreamControlRecvParams>;
pub(crate) type Sorp = HashMap<U31, stream_receiver::StreamOverallRecvParams>;
#[derive(Debug)]
pub struct Http2<HB, SW, const IS_CLIENT: bool> {
inner: Arc<Http2Inner<HB, SW, IS_CLIENT>>,
}
impl<HB, SW, const IS_CLIENT: bool> Http2<HB, SW, IS_CLIENT>
where
HB: LeaseMut<Http2Buffer>,
SW: StreamWriter,
{
#[inline]
pub fn connection_state(&self) -> ConnectionState {
misc::connection_state(&self.inner.is_conn_open)
}
send_go_away_method!();
#[cfg(all(feature = "http-client-pool", feature = "tokio"))]
pub(crate) async fn swap_buffers(&mut self, hb: &mut HB) {
mem::swap(hb.lease_mut(), self.inner.hd.lock().await.parts_mut().hb);
}
async fn manage_initial_params<SR, const HAS_PREFACE: bool>(
mut hb: HB,
hp: HttpRecvParams,
stream_reader: SR,
mut stream_writer: SW,
) -> crate::Result<(impl Future<Output = ()>, Self)>
where
SR: StreamReader,
{
let is_conn_open = AtomicBool::new(true);
let sf = SettingsFrame::from_hp(hp);
let sf_buffer = &mut [0; 45];
let sf_bytes = sf.bytes(sf_buffer);
if hp.initial_window_len() == DEFAULT_INITIAL_WINDOW_LEN {
if HAS_PREFACE {
misc::write_array([PREFACE, sf_bytes], &is_conn_open, &mut stream_writer).await?;
} else {
misc::write_array([sf_bytes], &is_conn_open, &mut stream_writer).await?;
}
} else {
let wuf = window_update_frame::WindowUpdateFrame::new(
hp.initial_window_len().wrapping_sub(DEFAULT_INITIAL_WINDOW_LEN).into(),
U31::ZERO,
)?;
if HAS_PREFACE {
let array = [PREFACE, sf_bytes, &wuf.bytes()];
misc::write_array(array, &is_conn_open, &mut stream_writer).await?;
} else {
misc::write_array([sf_bytes, &wuf.bytes()], &is_conn_open, &mut stream_writer).await?;
}
}
hb.lease_mut().hpack_dec.set_max_bytes(hp.max_hpack_len().0);
hb.lease_mut().hpack_enc.set_max_dyn_super_bytes(hp.max_hpack_len().1);
let rd = reader_data::ReaderData::new(mem::take(&mut hb.lease_mut().pfb), stream_reader);
let max_frame_len = hp.max_frame_len();
let wd = writer_data::WriterData::new(stream_writer);
let inner = Arc::new(Http2Inner {
hd: AsyncMutex::new(Http2Data::new(hb, hp)),
is_conn_open,
read_frame_waker: AtomicWaker::new(),
wd: AsyncMutex::new(wd),
});
Ok((frame_reader::frame_reader(inner.clone(), max_frame_len, rd), Self { inner }))
}
}
impl<HB, SW> Http2<HB, SW, false>
where
HB: LeaseMut<Http2Buffer>,
SW: StreamWriter,
{
#[inline]
pub async fn accept<SR>(
mut hb: HB,
hp: HttpRecvParams,
(mut stream_reader, mut stream_writer): (SR, SW),
) -> crate::Result<(impl Future<Output = ()>, Self)>
where
SR: StreamReader,
{
hb.lease_mut().clear();
let mut buffer = [0; 24];
let _ = stream_reader.read(&mut buffer).await?;
if &buffer != PREFACE {
let _rslt = stream_writer
.write_all(
&go_away_frame::GoAwayFrame::new(Http2ErrorCode::ProtocolError, U31::ZERO).bytes(),
)
.await;
return Err(misc::protocol_err(Http2Error::NoPreface));
}
Self::manage_initial_params::<_, false>(hb, hp, stream_reader, stream_writer).await
}
#[inline]
pub async fn stream<T>(
&self,
mut cb: impl FnMut(Request<&mut ReqResBuffer>, Option<Protocol>) -> T,
) -> crate::Result<Option<(ServerStream<HB, SW>, T)>> {
let Self { inner } = self;
let mut is_registered = false;
let mut lock_pin = pin!(inner.hd.lock());
poll_fn(|cx| {
let mut guard = lock_pin!(cx, inner.hd, lock_pin);
let hdpm = guard.parts_mut();
if misc::connection_state(&inner.is_conn_open).is_closed() {
misc::frame_reader_rslt(hdpm.frame_reader_error)?;
return Poll::Ready(Ok(None));
}
let Some(lss) = hdpm.hb.initial_server_streams_remote.pop_front() else {
if is_registered {
misc::frame_reader_rslt(hdpm.frame_reader_error)?;
return Poll::Ready(Ok(None));
}
hdpm.hb.initial_server_streams_local.push_back(cx.waker().clone())?;
is_registered = true;
return Poll::Pending;
};
let Some(sorp) = hdpm.hb.sorps.get_mut(&lss.stream_id) else {
misc::frame_reader_rslt(hdpm.frame_reader_error)?;
return Poll::Ready(Ok(None));
};
let (method, protocol, stream_id) = (lss.method, lss.protocol, lss.stream_id);
Poll::Ready(Ok(Some((
ServerStream::new(
inner.clone(),
method,
protocol,
_trace_span!("New server stream", stream_id = %stream_id),
stream_id,
),
cb(Request::http2(method, &mut sorp.rrb), protocol),
))))
})
.await
}
}
impl<HB, SW> Http2<HB, SW, true>
where
HB: LeaseMut<Http2Buffer>,
SW: StreamWriter,
{
#[inline]
pub async fn connect<SR>(
mut hb: HB,
mut hp: HttpRecvParams,
(stream_reader, stream_writer): (SR, SW),
) -> crate::Result<(impl Future<Output = ()>, Self)>
where
SR: StreamReader,
{
hb.lease_mut().clear();
hp = hp.set_enable_connect_protocol(false);
Self::manage_initial_params::<_, true>(hb, hp, stream_reader, stream_writer).await
}
#[inline]
pub async fn stream(&self) -> crate::Result<ClientStream<HB, SW>> {
let Self { inner } = self;
let mut hd_guard = inner.hd.lock().await;
let hdpm = hd_guard.parts_mut();
if hdpm.hb.sorps.len() >= *Usize::from(hdpm.hp.max_concurrent_streams_num()) {
drop(hd_guard);
let err = misc::protocol_err(Http2Error::ExceedAmountOfActiveConcurrentStreams);
misc::process_higher_operation_err(&err, inner).await;
return Err(err);
}
let stream_id = *hdpm.last_stream_id;
let span = _trace_span!("New client stream", stream_id = %stream_id);
drop(hdpm.hb.scrps.insert(
stream_id,
stream_receiver::StreamControlRecvParams {
is_stream_open: true,
stream_state: stream_state::StreamState::Idle,
waker: Waker::noop().clone(),
windows: Windows::initial(hdpm.hp, hdpm.hps),
},
));
*hdpm.last_stream_id = hdpm.last_stream_id.wrapping_add(U31::TWO);
drop(hd_guard);
Ok(ClientStream::new(inner.clone(), span, stream_id))
}
}
impl<HB, SW, const IS_CLIENT: bool> Lease<Http2<HB, SW, IS_CLIENT>> for Http2<HB, SW, IS_CLIENT> {
#[inline]
fn lease(&self) -> &Http2<HB, SW, IS_CLIENT> {
self
}
}
impl<HB, SW, const IS_CLIENT: bool> LeaseMut<Http2<HB, SW, IS_CLIENT>>
for Http2<HB, SW, IS_CLIENT>
{
#[inline]
fn lease_mut(&mut self) -> &mut Http2<HB, SW, IS_CLIENT> {
self
}
}
impl<HB, SW, const IS_CLIENT: bool> SingleTypeStorage for Http2<HB, SW, IS_CLIENT> {
type Item = (HB, SW);
}
impl<HB, SW, const IS_CLIENT: bool> Clone for Http2<HB, SW, IS_CLIENT> {
#[inline]
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
#[derive(Debug)]
pub(crate) struct Http2Inner<HB, SW, const IS_CLIENT: bool> {
pub(crate) hd: AsyncMutex<Http2Data<HB, IS_CLIENT>>,
pub(crate) is_conn_open: AtomicBool,
pub(crate) read_frame_waker: AtomicWaker,
pub(crate) wd: AsyncMutex<writer_data::WriterData<SW>>,
}