1mod body;
36mod connection;
37mod driver;
38mod frame;
39mod handle;
40mod hpack;
41pub mod hpack_impl;
42mod tunnel;
43mod write_half;
44
45pub use body::H2BodyTimeouts;
46pub use connection::{
47 H2Connection as RawH2Connection, H2Error, StreamResponse, CHROME_WINDOW_UPDATE,
48};
49pub use driver::{DriverCommand, H2Driver};
50pub use frame::{
51 flags, DataFrame, ErrorCode, FrameHeader, FrameType, GoAwayFrame, HeadersFrame, PingFrame,
52 PriorityData, PriorityFrame, PushPromiseFrame, RstStreamFrame, SettingsFrame, SettingsId,
53 WindowUpdateFrame, CONNECTION_PREFACE, DEFAULT_MAX_FRAME_SIZE, FRAME_HEADER_SIZE,
54};
55pub use handle::H2Handle;
56pub use hpack::{HpackDecoder, HpackEncoder, PseudoHeaderOrder};
57pub use tunnel::{H2Tunnel, H2TunnelEvent, H2TunnelOutbound};
58
59pub(crate) use body::{H2Body, H2DirectBody, H2DirectReuseHook, DEFAULT_H2_BODY_SLOT_CAPACITY};
60use handle::H2InlineState;
61
62use bytes::Bytes;
64use http::{Method, Uri};
65use std::time::Duration;
66
67use crate::error::Result;
68use crate::fingerprint::http2::Http2Settings;
69use crate::headers::Headers;
70use crate::response::Response;
71use crate::transport::connector::MaybeHttpsStream;
72
73#[derive(Debug, Clone)]
75pub struct H2TransportConfig {
76 pub keep_alive_interval: Option<Duration>,
77 pub keep_alive_timeout: Duration,
78 pub keep_alive_while_idle: bool,
79 pub max_concurrent_streams_per_connection: Option<u32>,
80 pub streaming_body_buffer_slots: usize,
81}
82
83impl Default for H2TransportConfig {
84 fn default() -> Self {
85 Self {
86 keep_alive_interval: None,
87 keep_alive_timeout: Duration::from_secs(20),
88 keep_alive_while_idle: false,
89 max_concurrent_streams_per_connection: None,
90 streaming_body_buffer_slots: DEFAULT_H2_BODY_SLOT_CAPACITY,
91 }
92 }
93}
94
95impl H2TransportConfig {
96 pub(crate) fn normalized(mut self) -> Self {
97 self.streaming_body_buffer_slots = self.streaming_body_buffer_slots.max(1);
98 self
99 }
100
101 pub(crate) fn effective_max_concurrent_streams(&self, peer_max_streams: u32) -> usize {
102 match self.max_concurrent_streams_per_connection {
103 Some(local_max) if local_max > 0 => peer_max_streams.min(local_max) as usize,
104 _ => peer_max_streams as usize,
105 }
106 }
107}
108
109pub struct H2Connection {
111 inner: RawH2Connection<MaybeHttpsStream>,
113 settings: Http2Settings,
115 pseudo_order: PseudoHeaderOrder,
117}
118
119impl H2Connection {
120 pub async fn connect(
125 stream: MaybeHttpsStream,
126 settings: Http2Settings,
127 pseudo_order: PseudoHeaderOrder,
128 ) -> Result<Self> {
129 let inner = RawH2Connection::connect(stream, settings.clone(), pseudo_order).await?;
130
131 Ok(Self {
132 inner,
133 settings,
134 pseudo_order,
135 })
136 }
137
138 pub async fn connect_chrome(stream: MaybeHttpsStream) -> Result<Self> {
140 Self::connect(stream, Http2Settings::default(), PseudoHeaderOrder::Chrome).await
141 }
142
143 pub async fn send_request(
145 &mut self,
146 method: Method,
147 uri: &Uri,
148 headers: impl Into<Headers>,
149 body: Option<Bytes>,
150 ) -> Result<Response> {
151 let headers = headers.into();
152 self.inner.send_request(method, uri, &headers, body).await
153 }
154
155 pub async fn send_request_streaming(
158 &mut self,
159 request: http::Request<Bytes>,
160 ) -> std::result::Result<
161 (
162 http::Response<Bytes>,
163 tokio::sync::mpsc::Receiver<std::result::Result<Bytes, H2Error>>,
164 ),
165 crate::error::Error,
166 > {
167 self.inner.send_request_streaming(request).await
168 }
169
170 pub async fn read_streaming_frames(&mut self) -> Result<bool> {
173 self.inner.read_streaming_frames().await
174 }
175
176 pub fn pseudo_order(&self) -> PseudoHeaderOrder {
178 self.pseudo_order
179 }
180
181 pub fn settings(&self) -> &Http2Settings {
183 &self.settings
184 }
185}
186
187pub struct H2PooledConnection {
192 handle: H2Handle,
193}
194
195impl H2PooledConnection {
196 pub fn new(conn: H2Connection) -> Self {
199 Self::new_with_config(conn, H2TransportConfig::default())
200 }
201
202 pub fn new_with_config(conn: H2Connection, config: H2TransportConfig) -> Self {
204 let config = config.normalized();
205 const CHANNEL_BUFFER: usize = 32;
206 let (command_tx, command_rx) = tokio::sync::mpsc::channel(CHANNEL_BUFFER);
207 let goaway_received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
208
209 let (inline_register_tx, inline_register_rx) = tokio::sync::mpsc::unbounded_channel();
210 let inline_active = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
211 let inline_eligible = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
212 let body_progress_notify = std::sync::Arc::new(tokio::sync::Notify::new());
213 let backpressure_stall_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
214
215 let write_half = conn.inner.write_half_arc();
216 let peer_max_frame_size = conn.inner.peer_max_frame_size_arc();
217 let initial_window_size = conn.inner.local_initial_window_size();
218
219 let inline_state = std::sync::Arc::new(H2InlineState {
220 write_half,
221 peer_max_frame_size,
222 initial_window_size,
223 register_tx: inline_register_tx,
224 inline_active: inline_active.clone(),
225 inline_eligible: inline_eligible.clone(),
226 body_progress_notify: body_progress_notify.clone(),
227 streaming_body_buffer_slots: config.streaming_body_buffer_slots,
228 });
229
230 let driver = H2Driver::new_with_inline(
231 conn.inner,
232 command_tx.clone(),
233 command_rx,
234 goaway_received.clone(),
235 config.clone(),
236 inline_register_rx,
237 inline_active,
238 inline_eligible,
239 body_progress_notify,
240 backpressure_stall_count.clone(),
241 );
242
243 tokio::spawn(async move {
245 if let Err(e) = driver.drive().await {
246 tracing::error!("H2Driver error: {:?}", e);
247 }
248 });
249
250 let handle = H2Handle::with_inline(
251 command_tx,
252 goaway_received,
253 inline_state,
254 config,
255 backpressure_stall_count,
256 );
257 Self { handle }
258 }
259
260 pub fn is_alive(&self) -> bool {
262 self.handle.is_alive()
263 }
264
265 pub fn backpressure_stall_count(&self) -> u64 {
267 self.handle.backpressure_stall_count()
268 }
269
270 pub async fn send_request(
273 &self,
274 method: Method,
275 uri: &Uri,
276 headers: impl Into<Headers>,
277 body: Option<Bytes>,
278 ) -> Result<Response> {
279 self.handle.send_request(method, uri, headers, body).await
280 }
281
282 pub async fn send_streaming_request(
284 &self,
285 method: Method,
286 uri: &Uri,
287 headers: impl Into<Headers>,
288 body: crate::request::RequestBody,
289 body_timeouts: H2BodyTimeouts,
290 ) -> Result<Response> {
291 self.handle
292 .send_streaming_request(method, uri, headers, body, body_timeouts)
293 .await
294 }
295
296 pub async fn open_websocket_tunnel(
298 &self,
299 uri: Uri,
300 headers: impl Into<Headers>,
301 ) -> Result<H2Tunnel> {
302 self.handle.open_websocket_tunnel(uri, headers).await
303 }
304
305 pub fn clone_handle(&self) -> Self {
308 Self {
309 handle: self.handle.clone(),
310 }
311 }
312}
313
314impl Clone for H2PooledConnection {
315 fn clone(&self) -> Self {
316 self.clone_handle()
317 }
318}
319
320pub struct H2ClientBuilder {
322 settings: Http2Settings,
323 pseudo_order: PseudoHeaderOrder,
324}
325
326impl H2ClientBuilder {
327 pub fn new() -> Self {
329 Self {
330 settings: Http2Settings::default(),
331 pseudo_order: PseudoHeaderOrder::Chrome,
332 }
333 }
334
335 pub fn settings(mut self, settings: Http2Settings) -> Self {
337 self.settings = settings;
338 self
339 }
340
341 pub fn pseudo_order(mut self, order: PseudoHeaderOrder) -> Self {
343 self.pseudo_order = order;
344 self
345 }
346
347 pub fn header_table_size(mut self, size: u32) -> Self {
349 self.settings.header_table_size = size;
350 self
351 }
352
353 pub fn initial_window_size(mut self, size: u32) -> Self {
355 self.settings.initial_window_size = size;
356 self
357 }
358
359 pub fn max_concurrent_streams(mut self, max: u32) -> Self {
361 self.settings.max_concurrent_streams = max;
362 self
363 }
364
365 pub fn max_frame_size(mut self, size: u32) -> Self {
367 self.settings.max_frame_size = size;
368 self
369 }
370
371 pub fn max_header_list_size(mut self, size: u32) -> Self {
373 self.settings.max_header_list_size = size;
374 self
375 }
376
377 pub fn enable_push(mut self, enable: bool) -> Self {
379 self.settings.enable_push = enable;
380 self
381 }
382
383 pub async fn connect(self, stream: MaybeHttpsStream) -> Result<H2Connection> {
385 H2Connection::connect(stream, self.settings, self.pseudo_order).await
386 }
387
388 pub fn get_settings(&self) -> &Http2Settings {
390 &self.settings
391 }
392
393 pub fn get_pseudo_order(&self) -> PseudoHeaderOrder {
395 self.pseudo_order
396 }
397}
398
399impl Default for H2ClientBuilder {
400 fn default() -> Self {
401 Self::new()
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn test_default_settings_match_chrome() {
411 let settings = Http2Settings::default();
412 assert_eq!(settings.header_table_size, 65536);
413 assert_eq!(settings.initial_window_size, 6291456);
414 assert_eq!(settings.max_concurrent_streams, 1000);
415 assert_eq!(settings.max_frame_size, 16384);
416 assert_eq!(settings.max_header_list_size, 262144);
417 assert!(!settings.enable_push);
418 }
419
420 #[test]
421 fn test_builder_settings() {
422 let builder = H2ClientBuilder::new()
423 .header_table_size(4096)
424 .initial_window_size(65535)
425 .max_concurrent_streams(100);
426
427 assert_eq!(builder.settings.header_table_size, 4096);
428 assert_eq!(builder.settings.initial_window_size, 65535);
429 assert_eq!(builder.settings.max_concurrent_streams, 100);
430 }
431
432 #[test]
433 fn test_builder_pseudo_order() {
434 let builder = H2ClientBuilder::new();
435 assert_eq!(builder.pseudo_order, PseudoHeaderOrder::Chrome);
436
437 let builder = builder.pseudo_order(PseudoHeaderOrder::Firefox);
438 assert_eq!(builder.pseudo_order, PseudoHeaderOrder::Firefox);
439 }
440
441 #[test]
442 fn test_pseudo_order_akamai_strings() {
443 assert_eq!(PseudoHeaderOrder::Chrome.akamai_string(), "m,s,a,p");
444 assert_eq!(PseudoHeaderOrder::Firefox.akamai_string(), "m,p,a,s");
445 assert_eq!(PseudoHeaderOrder::Safari.akamai_string(), "m,s,p,a");
446 assert_eq!(PseudoHeaderOrder::Standard.akamai_string(), "m,a,s,p");
447 }
448}