mod body;
mod connection;
mod driver;
mod frame;
mod handle;
mod hpack;
pub mod hpack_impl;
mod tunnel;
mod write_half;
pub use body::H2BodyTimeouts;
pub use connection::{
H2Connection as RawH2Connection, H2Error, StreamResponse, CHROME_WINDOW_UPDATE,
};
pub use driver::{DriverCommand, H2Driver};
pub use frame::{
flags, DataFrame, ErrorCode, FrameHeader, FrameType, GoAwayFrame, HeadersFrame, PingFrame,
PriorityData, PriorityFrame, PushPromiseFrame, RstStreamFrame, SettingsFrame, SettingsId,
WindowUpdateFrame, CONNECTION_PREFACE, DEFAULT_MAX_FRAME_SIZE, FRAME_HEADER_SIZE,
};
pub use handle::H2Handle;
pub use hpack::{HpackDecoder, HpackEncoder, PseudoHeaderOrder};
pub use tunnel::{H2Tunnel, H2TunnelEvent, H2TunnelOutbound};
pub(crate) use body::{H2Body, H2DirectBody, H2DirectReuseHook, DEFAULT_H2_BODY_SLOT_CAPACITY};
use handle::H2InlineState;
use bytes::Bytes;
use http::{Method, Uri};
use std::time::Duration;
use crate::error::Result;
use crate::fingerprint::http2::Http2Settings;
use crate::headers::Headers;
use crate::response::Response;
use crate::transport::connector::MaybeHttpsStream;
#[derive(Debug, Clone)]
pub struct H2TransportConfig {
pub keep_alive_interval: Option<Duration>,
pub keep_alive_timeout: Duration,
pub keep_alive_while_idle: bool,
pub max_concurrent_streams_per_connection: Option<u32>,
pub streaming_body_buffer_slots: usize,
}
impl Default for H2TransportConfig {
fn default() -> Self {
Self {
keep_alive_interval: None,
keep_alive_timeout: Duration::from_secs(20),
keep_alive_while_idle: false,
max_concurrent_streams_per_connection: None,
streaming_body_buffer_slots: DEFAULT_H2_BODY_SLOT_CAPACITY,
}
}
}
impl H2TransportConfig {
pub(crate) fn normalized(mut self) -> Self {
self.streaming_body_buffer_slots = self.streaming_body_buffer_slots.max(1);
self
}
pub(crate) fn effective_max_concurrent_streams(&self, peer_max_streams: u32) -> usize {
match self.max_concurrent_streams_per_connection {
Some(local_max) if local_max > 0 => peer_max_streams.min(local_max) as usize,
_ => peer_max_streams as usize,
}
}
}
pub struct H2Connection {
inner: RawH2Connection<MaybeHttpsStream>,
settings: Http2Settings,
pseudo_order: PseudoHeaderOrder,
}
impl H2Connection {
pub async fn connect(
stream: MaybeHttpsStream,
settings: Http2Settings,
pseudo_order: PseudoHeaderOrder,
) -> Result<Self> {
let inner = RawH2Connection::connect(stream, settings.clone(), pseudo_order).await?;
Ok(Self {
inner,
settings,
pseudo_order,
})
}
pub async fn connect_chrome(stream: MaybeHttpsStream) -> Result<Self> {
Self::connect(stream, Http2Settings::default(), PseudoHeaderOrder::Chrome).await
}
pub async fn send_request(
&mut self,
method: Method,
uri: &Uri,
headers: impl Into<Headers>,
body: Option<Bytes>,
) -> Result<Response> {
let headers = headers.into();
self.inner.send_request(method, uri, &headers, body).await
}
pub async fn send_request_streaming(
&mut self,
request: http::Request<Bytes>,
) -> std::result::Result<
(
http::Response<Bytes>,
tokio::sync::mpsc::Receiver<std::result::Result<Bytes, H2Error>>,
),
crate::error::Error,
> {
self.inner.send_request_streaming(request).await
}
pub async fn read_streaming_frames(&mut self) -> Result<bool> {
self.inner.read_streaming_frames().await
}
pub fn pseudo_order(&self) -> PseudoHeaderOrder {
self.pseudo_order
}
pub fn settings(&self) -> &Http2Settings {
&self.settings
}
}
pub struct H2PooledConnection {
handle: H2Handle,
}
impl H2PooledConnection {
pub fn new(conn: H2Connection) -> Self {
Self::new_with_config(conn, H2TransportConfig::default())
}
pub fn new_with_config(conn: H2Connection, config: H2TransportConfig) -> Self {
let config = config.normalized();
const CHANNEL_BUFFER: usize = 32;
let (command_tx, command_rx) = tokio::sync::mpsc::channel(CHANNEL_BUFFER);
let goaway_received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let (inline_register_tx, inline_register_rx) = tokio::sync::mpsc::unbounded_channel();
let inline_active = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let inline_eligible = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
let body_progress_notify = std::sync::Arc::new(tokio::sync::Notify::new());
let backpressure_stall_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let write_half = conn.inner.write_half_arc();
let peer_max_frame_size = conn.inner.peer_max_frame_size_arc();
let initial_window_size = conn.inner.local_initial_window_size();
let inline_state = std::sync::Arc::new(H2InlineState {
write_half,
peer_max_frame_size,
initial_window_size,
register_tx: inline_register_tx,
inline_active: inline_active.clone(),
inline_eligible: inline_eligible.clone(),
body_progress_notify: body_progress_notify.clone(),
streaming_body_buffer_slots: config.streaming_body_buffer_slots,
});
let driver = H2Driver::new_with_inline(
conn.inner,
command_tx.clone(),
command_rx,
goaway_received.clone(),
config.clone(),
inline_register_rx,
inline_active,
inline_eligible,
body_progress_notify,
backpressure_stall_count.clone(),
);
tokio::spawn(async move {
if let Err(e) = driver.drive().await {
tracing::error!("H2Driver error: {:?}", e);
}
});
let handle = H2Handle::with_inline(
command_tx,
goaway_received,
inline_state,
config,
backpressure_stall_count,
);
Self { handle }
}
pub fn is_alive(&self) -> bool {
self.handle.is_alive()
}
pub fn backpressure_stall_count(&self) -> u64 {
self.handle.backpressure_stall_count()
}
pub async fn send_request(
&self,
method: Method,
uri: &Uri,
headers: impl Into<Headers>,
body: Option<Bytes>,
) -> Result<Response> {
self.handle.send_request(method, uri, headers, body).await
}
pub async fn send_streaming_request(
&self,
method: Method,
uri: &Uri,
headers: impl Into<Headers>,
body: crate::request::RequestBody,
body_timeouts: H2BodyTimeouts,
) -> Result<Response> {
self.handle
.send_streaming_request(method, uri, headers, body, body_timeouts)
.await
}
pub async fn open_websocket_tunnel(
&self,
uri: Uri,
headers: impl Into<Headers>,
) -> Result<H2Tunnel> {
self.handle.open_websocket_tunnel(uri, headers).await
}
pub fn clone_handle(&self) -> Self {
Self {
handle: self.handle.clone(),
}
}
}
impl Clone for H2PooledConnection {
fn clone(&self) -> Self {
self.clone_handle()
}
}
pub struct H2ClientBuilder {
settings: Http2Settings,
pseudo_order: PseudoHeaderOrder,
}
impl H2ClientBuilder {
pub fn new() -> Self {
Self {
settings: Http2Settings::default(),
pseudo_order: PseudoHeaderOrder::Chrome,
}
}
pub fn settings(mut self, settings: Http2Settings) -> Self {
self.settings = settings;
self
}
pub fn pseudo_order(mut self, order: PseudoHeaderOrder) -> Self {
self.pseudo_order = order;
self
}
pub fn header_table_size(mut self, size: u32) -> Self {
self.settings.header_table_size = size;
self
}
pub fn initial_window_size(mut self, size: u32) -> Self {
self.settings.initial_window_size = size;
self
}
pub fn max_concurrent_streams(mut self, max: u32) -> Self {
self.settings.max_concurrent_streams = max;
self
}
pub fn max_frame_size(mut self, size: u32) -> Self {
self.settings.max_frame_size = size;
self
}
pub fn max_header_list_size(mut self, size: u32) -> Self {
self.settings.max_header_list_size = size;
self
}
pub fn enable_push(mut self, enable: bool) -> Self {
self.settings.enable_push = enable;
self
}
pub async fn connect(self, stream: MaybeHttpsStream) -> Result<H2Connection> {
H2Connection::connect(stream, self.settings, self.pseudo_order).await
}
pub fn get_settings(&self) -> &Http2Settings {
&self.settings
}
pub fn get_pseudo_order(&self) -> PseudoHeaderOrder {
self.pseudo_order
}
}
impl Default for H2ClientBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_settings_match_chrome() {
let settings = Http2Settings::default();
assert_eq!(settings.header_table_size, 65536);
assert_eq!(settings.initial_window_size, 6291456);
assert_eq!(settings.max_concurrent_streams, 1000);
assert_eq!(settings.max_frame_size, 16384);
assert_eq!(settings.max_header_list_size, 262144);
assert!(!settings.enable_push);
}
#[test]
fn test_builder_settings() {
let builder = H2ClientBuilder::new()
.header_table_size(4096)
.initial_window_size(65535)
.max_concurrent_streams(100);
assert_eq!(builder.settings.header_table_size, 4096);
assert_eq!(builder.settings.initial_window_size, 65535);
assert_eq!(builder.settings.max_concurrent_streams, 100);
}
#[test]
fn test_builder_pseudo_order() {
let builder = H2ClientBuilder::new();
assert_eq!(builder.pseudo_order, PseudoHeaderOrder::Chrome);
let builder = builder.pseudo_order(PseudoHeaderOrder::Firefox);
assert_eq!(builder.pseudo_order, PseudoHeaderOrder::Firefox);
}
#[test]
fn test_pseudo_order_akamai_strings() {
assert_eq!(PseudoHeaderOrder::Chrome.akamai_string(), "m,s,a,p");
assert_eq!(PseudoHeaderOrder::Firefox.akamai_string(), "m,p,a,s");
assert_eq!(PseudoHeaderOrder::Safari.akamai_string(), "m,s,p,a");
assert_eq!(PseudoHeaderOrder::Standard.akamai_string(), "m,a,s,p");
}
}