1use crate::url::Url;
11use base64::Engine;
12use bytes::Bytes;
13use http::{Method, Uri};
14use serde::Serialize;
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::{Arc, Mutex as StdMutex};
19use std::time::Duration;
20use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
21use tokio::time::timeout as tokio_timeout;
22
23use crate::cookie::CookieJar;
24use crate::error::{Error, Result};
25use crate::fingerprint::{http2::Http2Settings, FingerprintProfile, Http3Fingerprint};
26use crate::headers::Headers;
27use crate::pool::alt_svc::AltSvcCache;
28use crate::pool::multiplexer::{ConnectionPool, PoolKey};
29use crate::request::{IntoUrl, RedirectPolicy, Request, RequestBody};
30use crate::response::{Body, Response};
31use crate::timeouts::Timeouts;
32use crate::transport::connector::{AlpnMode, BoringConnector, EarlyDataOutcome, MaybeHttpsStream};
33use crate::transport::dns::{DnsConfig, Resolve};
34use crate::transport::h1::{h1_request_body_kind, H1Connection, H1StreamingOptions};
35use crate::transport::h2::{
36 H2BodyTimeouts, H2Connection, H2DirectBody, H2DirectReuseHook, H2PooledConnection,
37 H2TransportConfig, H2Tunnel, PseudoHeaderOrder, RawH2Connection,
38};
39use crate::transport::h3::{H3Backend, H3Client, H3TransportConfig, H3Tunnel};
40use crate::transport::is_zero_rtt_safe_request;
41use crate::transport::session::SessionCache;
42use crate::transport::tcp::TcpFingerprint;
43use crate::version::HttpVersion;
44use crate::websocket::{WebSocketBuilder, WebSocketClientParts};
45
46type H2DirectPool = Arc<StdMutex<HashMap<PoolKey, Vec<RawH2Connection<MaybeHttpsStream>>>>>;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub struct CapacityPolicy {
55 pub max_pending_per_origin: usize,
56 pub streaming_body_buffer_slots: usize,
57 pub h3_tunnel_outbound_byte_budget: usize,
58 pub h3_tunnel_inbound_byte_budget: usize,
59}
60
61impl CapacityPolicy {
62 pub fn bounded(max_pending_per_origin: usize) -> Self {
63 let normalized = max_pending_per_origin.max(1);
64 Self {
65 max_pending_per_origin: normalized,
66 streaming_body_buffer_slots: normalized,
67 h3_tunnel_outbound_byte_budget: H3TransportConfig::default()
68 .tunnel_outbound_byte_budget,
69 h3_tunnel_inbound_byte_budget: H3TransportConfig::default().tunnel_inbound_byte_budget,
70 }
71 }
72
73 pub fn with_streaming_body_buffer_slots(mut self, slots: usize) -> Self {
78 self.streaming_body_buffer_slots = slots.max(1);
79 self
80 }
81
82 pub fn with_h3_tunnel_byte_budget(mut self, bytes: usize) -> Self {
83 let bytes = bytes
84 .max(crate::transport::h3::MIN_H3_TUNNEL_OUTBOUND_BYTE_BUDGET)
85 .max(crate::transport::h3::MIN_H3_TUNNEL_INBOUND_BYTE_BUDGET);
86 self.h3_tunnel_outbound_byte_budget = bytes;
87 self.h3_tunnel_inbound_byte_budget = bytes;
88 self
89 }
90
91 pub fn with_h3_tunnel_outbound_byte_budget(mut self, bytes: usize) -> Self {
92 self.h3_tunnel_outbound_byte_budget =
93 bytes.max(crate::transport::h3::MIN_H3_TUNNEL_OUTBOUND_BYTE_BUDGET);
94 self
95 }
96
97 pub fn with_h3_tunnel_inbound_byte_budget(mut self, bytes: usize) -> Self {
98 self.h3_tunnel_inbound_byte_budget =
99 bytes.max(crate::transport::h3::MIN_H3_TUNNEL_INBOUND_BYTE_BUDGET);
100 self
101 }
102}
103
104struct H2DirectStart {
105 conn: RawH2Connection<MaybeHttpsStream>,
106 stream_id: u32,
107 status: u16,
108 headers: Vec<(String, String)>,
109 end_stream: bool,
110}
111
112struct H2DirectResponseRequest {
113 conn: RawH2Connection<MaybeHttpsStream>,
114 method: Method,
115 uri: Uri,
116 headers: Headers,
117 body_timeouts: H2BodyTimeouts,
118 pool_key: PoolKey,
119 ttfb_timeout: Option<Duration>,
120}
121
122#[derive(Clone)]
131pub struct Client {
132 connector: BoringConnector,
133 insecure_connector: BoringConnector,
135 h3_client: H3Client,
136 alt_svc_cache: Arc<AltSvcCache>,
137 h2_pool: Arc<RwLock<HashMap<PoolKey, H2PooledConnection>>>,
139 h2_direct_pool: H2DirectPool,
141 h1_pool: Arc<ConnectionPool>,
143 h1_connection_slots: Arc<RwLock<HashMap<PoolKey, Arc<Semaphore>>>>,
145 h1_max_connections_per_origin: usize,
146 http2_settings: Http2Settings,
147 pseudo_order: PseudoHeaderOrder,
148 default_version: HttpVersion,
149 timeouts: Timeouts,
151 h2_transport_config: H2TransportConfig,
153 h2_direct_streaming_responses: bool,
156 h3_upgrade_enabled: bool,
158 http2_prior_knowledge: bool,
160 danger_accept_invalid_certs: bool,
162 localhost_allows_invalid_certs: bool,
164 default_headers: Headers,
166 redirect_policy: RedirectPolicy,
168 cookie_store: Option<Arc<RwLock<CookieJar>>>,
170 fingerprint: FingerprintProfile,
172 http_tls_early_data: bool,
174 pool_reuse_counter: Arc<AtomicUsize>,
178}
179
180pub struct RequestBuilder<'a> {
182 client: &'a Client,
183 url: Option<Url>,
184 method: Method,
185 headers: Headers,
186 body: RequestBody,
187 version: Option<HttpVersion>,
188 timeout: Option<Duration>,
189 error: Option<Error>,
190}
191
192pub struct WebSocketH2Builder<'a> {
194 client: &'a Client,
195 url: Option<Url>,
196 headers: Headers,
197 error: Option<Error>,
198}
199
200pub struct WebSocketH3Builder<'a> {
202 client: &'a Client,
203 url: Option<Url>,
204 headers: Headers,
205 error: Option<Error>,
206}
207
208pub struct ClientBuilder {
210 fingerprint: FingerprintProfile,
211 http2_settings: Option<Http2Settings>,
212 pseudo_order: Option<PseudoHeaderOrder>,
213 timeouts: Timeouts,
214 dns_config: DnsConfig,
215 pool_idle_timeout: Duration,
216 pool_max_idle_per_host: usize,
217 h1_max_connections_per_origin: usize,
218 h3_max_idle_timeout: Option<u64>,
219 h3_fingerprint: Option<Http3Fingerprint>,
220 h3_backend: H3Backend,
221 h3_transport_config: H3TransportConfig,
222 h2_transport_config: H2TransportConfig,
223 h2_direct_streaming_responses: bool,
224 tcp_keepalive: Option<Duration>,
225 tcp_keepalive_interval: Option<Duration>,
226 tcp_keepalive_retries: Option<u32>,
227 tcp_fingerprint: Option<TcpFingerprint>,
228 prefer_http2: bool,
229 h3_upgrade_enabled: bool,
230 http2_prior_knowledge: bool,
231 root_certs: Vec<Vec<u8>>,
232 use_platform_roots: bool,
234 danger_accept_invalid_certs: bool,
236 localhost_allows_invalid_certs: bool,
238 default_headers: Headers,
240 redirect_policy: RedirectPolicy,
242 cookie_store: Option<Arc<RwLock<CookieJar>>>,
244 http_tls_early_data: bool,
246}
247
248impl Client {
249 pub fn new() -> Result<Self> {
251 ClientBuilder::new().build()
252 }
253
254 pub fn builder() -> ClientBuilder {
256 ClientBuilder::new()
257 }
258
259 pub fn connection_reuse_count(&self) -> usize {
263 self.pool_reuse_counter.load(Ordering::Relaxed)
264 }
265
266 pub fn get(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
268 RequestBuilder::new(self, Method::GET, url)
269 }
270
271 pub fn post(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
273 RequestBuilder::new(self, Method::POST, url)
274 }
275
276 pub fn put(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
278 RequestBuilder::new(self, Method::PUT, url)
279 }
280
281 pub fn delete(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
283 RequestBuilder::new(self, Method::DELETE, url)
284 }
285
286 pub fn head(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
288 RequestBuilder::new(self, Method::HEAD, url)
289 }
290
291 pub fn patch(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
293 RequestBuilder::new(self, Method::PATCH, url)
294 }
295
296 pub fn request(&self, method: Method, url: impl IntoUrl) -> RequestBuilder<'_> {
298 RequestBuilder::new(self, method, url)
299 }
300
301 pub fn websocket_h2(&self, url: impl IntoUrl) -> WebSocketH2Builder<'_> {
303 WebSocketH2Builder::new(self, url)
304 }
305
306 pub fn websocket_h3(&self, url: impl IntoUrl) -> WebSocketH3Builder<'_> {
308 WebSocketH3Builder::new(self, url)
309 }
310
311 pub fn websocket(&self, url: impl IntoUrl) -> WebSocketBuilder<'_> {
313 Client::websocket_with_parts(
314 WebSocketClientParts {
315 connector: &self.connector,
316 insecure_connector: &self.insecure_connector,
317 default_headers: &self.default_headers,
318 timeouts: &self.timeouts,
319 cookie_store: self.cookie_store.as_ref(),
320 danger_accept_invalid_certs: self.danger_accept_invalid_certs,
321 localhost_allows_invalid_certs: self.localhost_allows_invalid_certs,
322 },
323 url,
324 )
325 }
326
327 pub fn alt_svc_cache(&self) -> &Arc<AltSvcCache> {
329 &self.alt_svc_cache
330 }
331
332 pub fn h3_client(&self) -> &H3Client {
335 &self.h3_client
336 }
337
338 pub fn fingerprint_profile(&self) -> FingerprintProfile {
340 self.fingerprint
341 }
342
343 pub fn http2_settings(&self) -> &Http2Settings {
345 &self.http2_settings
346 }
347
348 pub fn pseudo_order(&self) -> PseudoHeaderOrder {
350 self.pseudo_order
351 }
352
353 pub fn default_headers(&self) -> &Headers {
355 &self.default_headers
356 }
357
358 pub fn h1_max_connections_per_origin(&self) -> usize {
360 self.h1_max_connections_per_origin
361 }
362
363 pub fn h2_max_concurrent_streams_per_connection(&self) -> Option<u32> {
365 self.h2_transport_config
366 .max_concurrent_streams_per_connection
367 }
368
369 pub fn h2_streaming_body_buffer_slots(&self) -> usize {
371 self.h2_transport_config.streaming_body_buffer_slots
372 }
373
374 pub fn http2_keep_alive_interval(&self) -> Option<Duration> {
376 self.h2_transport_config.keep_alive_interval
377 }
378
379 pub fn http2_keep_alive_while_idle(&self) -> bool {
381 self.h2_transport_config.keep_alive_while_idle
382 }
383
384 pub fn h3_streaming_body_buffer_slots(&self) -> usize {
386 self.h3_client.streaming_body_buffer_slots()
387 }
388
389 pub fn h3_tunnel_outbound_byte_budget(&self) -> usize {
391 self.h3_client.tunnel_outbound_byte_budget()
392 }
393
394 pub fn h3_tunnel_inbound_byte_budget(&self) -> usize {
396 self.h3_client.tunnel_inbound_byte_budget()
397 }
398
399 async fn acquire_h1_connection_slot(
400 &self,
401 key: &PoolKey,
402 timeouts: &Timeouts,
403 ) -> Result<Option<OwnedSemaphorePermit>> {
404 if self.h1_max_connections_per_origin == 0 {
405 return Ok(None);
406 }
407
408 let semaphore = {
409 let mut slots = self.h1_connection_slots.write().await;
410 slots
411 .entry(key.clone())
412 .or_insert_with(|| Arc::new(Semaphore::new(self.h1_max_connections_per_origin)))
413 .clone()
414 };
415
416 let acquire = semaphore.acquire_owned();
417 let permit = if let Some(pool_acquire_timeout) = timeouts.pool_acquire {
418 tokio_timeout(pool_acquire_timeout, acquire)
419 .await
420 .map_err(|_| Error::PoolAcquireTimeout(pool_acquire_timeout))?
421 } else {
422 acquire.await
423 }
424 .map_err(|_| Error::Connection("HTTP/1.1 connection scheduler closed".into()))?;
425
426 Ok(Some(permit))
427 }
428
429 fn is_localhost(host: &str) -> bool {
431 host == "localhost" || host == "127.0.0.1" || host == "::1"
432 }
433
434 fn connector_for_uri(&self, uri: &Uri) -> &BoringConnector {
436 if self.danger_accept_invalid_certs {
438 return &self.insecure_connector;
439 }
440
441 if self.localhost_allows_invalid_certs {
443 if let Some(host) = uri.host() {
444 if Self::is_localhost(host) {
445 return &self.insecure_connector;
446 }
447 }
448 }
449
450 &self.connector
451 }
452}
453
454impl<'a> WebSocketH2Builder<'a> {
455 fn new(client: &'a Client, url: impl IntoUrl) -> Self {
456 let mut error = None;
457 let url = match url.into_url() {
458 Ok(url) => Some(url),
459 Err(err) => {
460 error = Some(err);
461 None
462 }
463 };
464
465 Self {
466 client,
467 url,
468 headers: client.default_headers.clone(),
469 error,
470 }
471 }
472
473 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
475 self.headers.insert(key, value);
476 self
477 }
478
479 pub fn headers(mut self, headers: impl Into<Headers>) -> Self {
481 self.headers = headers.into();
482 self
483 }
484
485 pub async fn open(self) -> Result<H2Tunnel> {
487 if let Some(err) = self.error {
488 return Err(err);
489 }
490
491 let url = self.url.ok_or_else(|| Error::missing("websocket URL"))?;
492
493 let websocket_scheme = url.scheme();
494 let h2_scheme = match websocket_scheme {
495 "wss" => "https",
496 "ws" => {
497 if !self.client.http2_prior_knowledge {
498 return Err(Error::WebSocketUnsupported(
499 "ws:// RFC 8441 requires explicit HTTP/2 prior knowledge".into(),
500 ));
501 }
502 "http"
503 }
504 other => {
505 return Err(Error::WebSocketUnsupported(format!(
506 "RFC 8441 requires ws:// or wss:// URL, got {other}"
507 )));
508 }
509 };
510
511 let mut h2_url = url.clone();
512 h2_url
513 .set_scheme(h2_scheme)
514 .map_err(|_| Error::WebSocketUnsupported("invalid WebSocket URL scheme".into()))?;
515
516 let uri: Uri = h2_url
517 .as_str()
518 .parse()
519 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
520
521 let headers = self.headers.clone();
522 let pool_key = self.client.make_pool_key(&uri);
523
524 if let Some(conn) = {
525 let pool = self.client.h2_pool.read().await;
526 pool.get(&pool_key).cloned()
527 } {
528 match conn.open_websocket_tunnel(uri.clone(), &headers).await {
529 Ok(tunnel) => return Ok(tunnel),
530 Err(err) => {
531 tracing::debug!("Pooled RFC 8441 tunnel open failed, reconnecting: {}", err);
532 let mut pool = self.client.h2_pool.write().await;
533 pool.remove(&pool_key);
534 }
535 }
536 }
537
538 let connector = self.client.connector_for_uri(&uri);
539 let stream = connector.connect(&uri).await?;
540
541 let use_http2 = if websocket_scheme == "ws" && self.client.http2_prior_knowledge {
542 true
543 } else if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
544 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
545 } else {
546 false
547 };
548
549 if !use_http2 {
550 return Err(Error::WebSocketUnsupported(
551 "RFC 8441 WebSocket requires ALPN h2 or explicit HTTP/2 prior knowledge".into(),
552 ));
553 }
554
555 let h2_conn = H2Connection::connect(
556 stream,
557 self.client.http2_settings.clone(),
558 self.client.pseudo_order,
559 )
560 .await?;
561 let pooled_conn =
562 H2PooledConnection::new_with_config(h2_conn, self.client.h2_transport_config.clone());
563
564 {
565 let mut pool = self.client.h2_pool.write().await;
566 pool.insert(pool_key, pooled_conn.clone());
567 }
568
569 pooled_conn.open_websocket_tunnel(uri, &headers).await
570 }
571}
572
573impl<'a> WebSocketH3Builder<'a> {
574 fn new(client: &'a Client, url: impl IntoUrl) -> Self {
575 let mut error = None;
576 let url = match url.into_url() {
577 Ok(url) => Some(url),
578 Err(err) => {
579 error = Some(err);
580 None
581 }
582 };
583
584 Self {
585 client,
586 url,
587 headers: client.default_headers.clone(),
588 error,
589 }
590 }
591
592 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
594 self.headers.insert(key, value);
595 self
596 }
597
598 pub fn headers(mut self, headers: impl Into<Headers>) -> Self {
600 self.headers = headers.into();
601 self
602 }
603
604 pub async fn open(self) -> Result<H3Tunnel> {
606 if let Some(err) = self.error {
607 return Err(err);
608 }
609
610 let url = self.url.ok_or_else(|| Error::missing("websocket URL"))?;
611 if url.scheme() != "wss" {
612 return Err(Error::WebSocketUnsupported(
613 "RFC 9220 WebSocket over HTTP/3 requires wss://".into(),
614 ));
615 }
616
617 let mut h3_url = url.clone();
618 h3_url
619 .set_scheme("https")
620 .map_err(|_| Error::WebSocketUnsupported("invalid WebSocket URL scheme".into()))?;
621
622 let mut h3_client = self.client.h3_client.clone();
623 if self.client.danger_accept_invalid_certs
624 || (self.client.localhost_allows_invalid_certs
625 && h3_url.host_str().is_some_and(Client::is_localhost))
626 {
627 h3_client = h3_client.danger_accept_invalid_certs(true);
628 }
629
630 let fut = h3_client.open_websocket_tunnel(h3_url.as_str(), &self.headers);
631 if let Some(total_timeout) = self.client.timeouts.total {
632 tokio_timeout(total_timeout, fut)
633 .await
634 .map_err(|_| Error::TotalTimeout(total_timeout))?
635 } else {
636 fut.await
637 }
638 }
639}
640
641impl<'a> RequestBuilder<'a> {
642 fn new(client: &'a Client, method: Method, url: impl IntoUrl) -> Self {
643 let mut error = None;
644 let url = match url.into_url() {
645 Ok(url) => Some(url),
646 Err(err) => {
647 error = Some(err);
648 None
649 }
650 };
651
652 Self {
653 client,
654 url,
655 method,
656 headers: client.default_headers.clone(),
657 body: RequestBody::Empty,
658 version: None,
659 timeout: None,
660 error,
661 }
662 }
663
664 fn set_error(&mut self, error: Error) {
665 if self.error.is_none() {
666 self.error = Some(error);
667 }
668 }
669
670 fn ensure_content_type(&mut self, value: &str) {
671 if !self.headers.contains("content-type") {
672 self.headers.insert("Content-Type", value.to_string());
673 }
674 }
675
676 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
678 self.headers.insert(key, value);
679 self
680 }
681
682 pub fn header_append(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
684 self.headers.append(key, value);
685 self
686 }
687
688 pub fn headers(mut self, headers: impl Into<Headers>) -> Self {
690 self.headers = headers.into();
691 self
692 }
693
694 pub fn body(mut self, body: impl Into<RequestBody>) -> Self {
698 self.body = body.into();
699 self
700 }
701
702 pub fn body_stream<S>(mut self, stream: S) -> Self
706 where
707 S: futures_core::Stream<Item = std::result::Result<Bytes, Error>> + Send + 'static,
708 {
709 self.body = RequestBody::Stream {
710 stream: Box::pin(stream),
711 content_length: None,
712 };
713 self
714 }
715
716 pub fn body_stream_sized<S>(mut self, stream: S, content_length: u64) -> Self
720 where
721 S: futures_core::Stream<Item = std::result::Result<Bytes, Error>> + Send + 'static,
722 {
723 self.body = RequestBody::Stream {
724 stream: Box::pin(stream),
725 content_length: Some(content_length),
726 };
727 self
728 }
729
730 pub fn query<T: Serialize + ?Sized>(mut self, query: &T) -> Self {
732 if self.error.is_some() {
733 return self;
734 }
735
736 let url = match self.url.as_mut() {
737 Some(url) => url,
738 None => return self,
739 };
740
741 match serde_urlencoded::to_string(query) {
742 Ok(encoded) => {
743 if !encoded.is_empty() {
744 let merged = match url.query() {
745 Some(existing) if !existing.is_empty() => {
746 format!("{}&{}", existing, encoded)
747 }
748 _ => encoded,
749 };
750 if let Err(err) = url.set_query(Some(&merged)) {
751 self.set_error(err.into());
752 }
753 }
754 }
755 Err(err) => self.set_error(err.into()),
756 }
757
758 self
759 }
760
761 pub fn json<T: Serialize + ?Sized>(mut self, json: &T) -> Self {
763 if self.error.is_some() {
764 return self;
765 }
766
767 match serde_json::to_vec(json) {
768 Ok(bytes) => {
769 self.body = RequestBody::Json(bytes);
770 self.ensure_content_type("application/json");
771 }
772 Err(err) => self.set_error(err.into()),
773 }
774
775 self
776 }
777
778 pub fn form<T: Serialize + ?Sized>(mut self, form: &T) -> Self {
780 if self.error.is_some() {
781 return self;
782 }
783
784 match serde_urlencoded::to_string(form) {
785 Ok(encoded) => {
786 self.body = RequestBody::Form(encoded);
787 self.ensure_content_type("application/x-www-form-urlencoded");
788 }
789 Err(err) => self.set_error(err.into()),
790 }
791
792 self
793 }
794
795 pub fn bearer_auth(mut self, token: impl AsRef<str>) -> Self {
797 self.headers
798 .insert("Authorization", format!("Bearer {}", token.as_ref()));
799 self
800 }
801
802 pub fn basic_auth<P: AsRef<str>>(
804 mut self,
805 username: impl AsRef<str>,
806 password: Option<P>,
807 ) -> Self {
808 let creds = match password {
809 Some(p) => format!("{}:{}", username.as_ref(), p.as_ref()),
810 None => format!("{}:", username.as_ref()),
811 };
812 let encoded = base64::engine::general_purpose::STANDARD.encode(creds.as_bytes());
813 self.headers
814 .insert("Authorization", format!("Basic {}", encoded));
815 self
816 }
817
818 pub fn timeout(mut self, timeout: Duration) -> Self {
820 self.timeout = Some(timeout);
821 self
822 }
823
824 pub fn version(mut self, version: HttpVersion) -> Self {
826 self.version = Some(version);
827 self
828 }
829
830 pub fn build(self) -> Result<Request> {
832 if let Some(error) = self.error {
833 return Err(error);
834 }
835
836 let url = self.url.ok_or_else(|| Error::missing("url"))?;
837
838 Ok(Request {
839 method: self.method,
840 url,
841 headers: self.headers,
842 body: self.body,
843 version: self.version,
844 timeout: self.timeout,
845 })
846 }
847
848 pub async fn send(self) -> Result<Response> {
850 let client = self.client.clone();
851 let request = self.build()?;
852 if request.body.is_streaming() {
853 return Err(Error::HttpProtocol(
854 "streaming request bodies require send_streaming()".into(),
855 ));
856 }
857 client.execute(request).await
858 }
859
860 pub async fn send_streaming(self) -> Result<Response> {
864 let policy = self.client.redirect_policy.clone();
865 if matches!(policy, RedirectPolicy::None) {
866 return self.send_streaming_once().await;
867 }
868
869 if self.body.is_streaming() {
870 let mut response = self.send_streaming_once().await?;
871 if response.is_redirect() {
872 drain_streaming_body(response.body_mut()).await?;
873 return Err(Error::HttpProtocol(
874 "redirect would require replaying a non-replayable streaming request body"
875 .into(),
876 ));
877 }
878 return Ok(response);
879 }
880
881 let client = self.client;
882 let mut request = self.build()?;
883 let mut redirects = 0u32;
884
885 loop {
886 let builder = RequestBuilder {
887 client,
888 url: Some(request.url.clone()),
889 method: request.method.clone(),
890 headers: request.headers.clone(),
891 body: request.body.clone(),
892 version: request.version,
893 timeout: request.timeout,
894 error: None,
895 };
896
897 let mut response = builder.send_streaming_once().await?;
898
899 if !response.is_redirect() {
900 return Ok(response);
901 }
902
903 let location = match response.redirect_url() {
904 Some(value) => value.to_string(),
905 None => return Ok(response),
906 };
907
908 if let RedirectPolicy::Limited(limit) = policy {
909 if redirects >= limit {
910 return Err(Error::RedirectLimit { count: limit });
911 }
912 }
913
914 drain_streaming_body(response.body_mut()).await?;
915
916 let next_url = request.url.join(&location).map_err(Error::from)?;
917 request = client.redirect_request(&request, &response, next_url)?;
918 redirects += 1;
919 }
920 }
921
922 async fn send_streaming_once(self) -> Result<Response> {
923 let client = self.client.clone();
924 let mut request = self.build()?;
925 let mut timeouts = client.timeouts.clone();
926 if let Some(total) = request.timeout {
927 timeouts.total = Some(total);
928 }
929
930 client
931 .apply_cookie_header_for_url(request.url.as_str(), &mut request.headers)
932 .await;
933
934 let version = request.version.unwrap_or(client.default_version);
935
936 if matches!(version, HttpVersion::Http3 | HttpVersion::Http3Only) {
937 if let Some(content_length) = request.body.content_length() {
938 if content_length > 0 && !request.headers.contains("content-length") {
939 request
940 .headers
941 .insert("Content-Length", content_length.to_string());
942 }
943 }
944 let body = if request.body.is_streaming() {
945 std::mem::take(&mut request.body)
946 } else {
947 request.body.clone()
948 };
949 let body_timeouts = crate::transport::h3::H3BodyTimeouts {
950 read_idle: timeouts.read_idle,
951 total: timeouts.total,
952 };
953
954 let fut = client.h3_client.send_streaming_with_timeouts(
955 request.url.as_str(),
956 request.method.as_str(),
957 &request.headers,
958 body,
959 body_timeouts,
960 );
961
962 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
963 tokio_timeout(ttfb_timeout, fut)
964 .await
965 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
966 } else if let Some(total_timeout) = timeouts.total {
967 tokio_timeout(total_timeout, fut)
968 .await
969 .map_err(|_| Error::TotalTimeout(total_timeout))??
970 } else {
971 fut.await?
972 };
973
974 let request_url = request.url.clone();
975 client
976 .store_cookies_from_headers(response.headers(), request_url.as_str())
977 .await;
978 let response = response.with_url(request_url);
979
980 if let Some(enc) = response.content_encoding() {
981 let enc_lc = enc.to_lowercase();
982 if enc_lc.contains("gzip")
983 || enc_lc.contains("deflate")
984 || enc_lc.contains("br")
985 || enc_lc.contains("zstd")
986 {
987 return Err(Error::Decompression(
988 "Compressed streaming is unsupported".into(),
989 ));
990 }
991 }
992
993 return Ok(response);
994 }
995
996 let uri: Uri = request
998 .url
999 .as_str()
1000 .parse()
1001 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
1002
1003 let request_url = request.url.clone();
1004 let prefer_http2 = match version {
1005 HttpVersion::Http1_1 => false,
1006 HttpVersion::Http2 => true,
1007 HttpVersion::Auto => matches!(client.default_version, HttpVersion::Http2),
1008 HttpVersion::Http3 | HttpVersion::Http3Only => unreachable!(),
1009 };
1010 let pool_key = client.make_pool_key(&uri);
1011
1012 let response = if !prefer_http2 {
1013 let h1_slot = client
1014 .acquire_h1_connection_slot(&pool_key, &timeouts)
1015 .await?;
1016 let pooled_h1_stream = client.h1_pool.get_h1(&pool_key).await;
1017 if pooled_h1_stream.is_some() {
1018 client.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1019 }
1020 let connector = client.connector_for_uri(&uri);
1021 let method = request.method.clone();
1022 let headers = request.headers.clone();
1023 let body = request.body;
1024 let use_early_data = client.http_tls_early_data
1025 && uri.scheme_str() == Some("https")
1026 && is_zero_rtt_safe_request(method.as_str(), &body);
1027
1028 let (stream, early_outcome) = if let Some(stream) = pooled_h1_stream {
1029 (stream, EarlyDataOutcome::NotAttempted)
1030 } else {
1031 let connect_result = if use_early_data {
1032 let body_kind = h1_request_body_kind(&body);
1033 let request_head =
1034 H1Connection::build_request_bytes(&method, &uri, &headers, body_kind)?;
1035 let connect_fut = connector.connect_with_alpn_and_early_data(
1036 &uri,
1037 AlpnMode::Http1Only,
1038 Some(&request_head),
1039 );
1040 if let Some(connect_timeout) = timeouts.connect {
1041 tokio_timeout(connect_timeout, connect_fut)
1042 .await
1043 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1044 } else {
1045 connect_fut.await?
1046 }
1047 } else {
1048 let connect_fut = connector.connect_h1_only(&uri);
1049 let stream = if let Some(connect_timeout) = timeouts.connect {
1050 tokio_timeout(connect_timeout, connect_fut)
1051 .await
1052 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1053 } else {
1054 connect_fut.await?
1055 };
1056 (stream, EarlyDataOutcome::NotAttempted)
1057 };
1058 connect_result
1059 };
1060
1061 let request_head_sent = matches!(
1062 early_outcome,
1063 EarlyDataOutcome::Accepted | EarlyDataOutcome::Rejected { .. }
1064 );
1065
1066 let h1_pool = client.h1_pool.clone();
1067 let pool_key_for_reuse = pool_key.clone();
1068 let on_reusable: crate::transport::h1::H1ReuseHook = Box::new(move |stream| {
1069 let _h1_slot = h1_slot;
1070 let _ = h1_pool.try_put_h1(pool_key_for_reuse, stream);
1071 });
1072 let conn = H1Connection::new(stream);
1073 let send_fut = conn.send_request_streaming(
1074 method,
1075 &uri,
1076 &headers,
1077 body,
1078 H1StreamingOptions {
1079 on_reusable,
1080 read_idle_timeout: timeouts.read_idle,
1081 total_timeout: timeouts.total,
1082 request_head_sent,
1083 },
1084 );
1085 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
1086 tokio_timeout(ttfb_timeout, send_fut)
1087 .await
1088 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
1089 } else {
1090 send_fut.await?
1091 };
1092
1093 client
1094 .store_cookies_from_headers(response.headers(), request_url.as_str())
1095 .await;
1096 let response = response.with_url(request_url);
1097 reject_compressed_streaming(&response)?;
1098 return Ok(response);
1099 } else {
1100 if let Some(content_length) = request.body.content_length() {
1101 if content_length > 0 && !request.headers.contains("content-length") {
1102 request
1103 .headers
1104 .insert("Content-Length", content_length.to_string());
1105 }
1106 }
1107 let body_timeouts = H2BodyTimeouts {
1108 read_idle: timeouts.read_idle,
1109 total: timeouts.total,
1110 };
1111 let pooled = {
1113 let mut pool = client.h2_pool.write().await;
1114 if let Some(conn) = pool.get(&pool_key) {
1115 if conn.is_alive() {
1116 Some(conn.clone())
1117 } else {
1118 pool.remove(&pool_key);
1119 None
1120 }
1121 } else {
1122 None
1123 }
1124 };
1125
1126 if let Some(conn) = pooled {
1127 client.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1128 let streaming_body = request.body.is_streaming();
1129 let body = if streaming_body {
1130 std::mem::take(&mut request.body)
1131 } else {
1132 request.body.clone()
1133 };
1134
1135 let send_fut = conn.send_streaming_request(
1136 request.method.clone(),
1137 &uri,
1138 &request.headers,
1139 body,
1140 body_timeouts,
1141 );
1142 let res = if let Some(ttfb_timeout) = timeouts.ttfb {
1143 tokio_timeout(ttfb_timeout, send_fut)
1144 .await
1145 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1146 } else {
1147 send_fut.await
1148 };
1149
1150 match res {
1151 Ok(response) => {
1152 let response = response.with_url(request_url.clone());
1153 client
1154 .store_cookies_from_headers(response.headers(), request_url.as_str())
1155 .await;
1156 response
1157 }
1158 Err(e) => {
1159 if streaming_body {
1160 return Err(e);
1161 }
1162 tracing::debug!(
1163 "Pooled HTTP/2 connection failed for streaming, creating new: {}",
1164 e
1165 );
1166 let mut pool = client.h2_pool.write().await;
1167 pool.remove(&pool_key);
1168 drop(pool);
1169
1170 let connector = client.connector_for_uri(&uri);
1171 let connect_fut = connector.connect(&uri);
1172 let stream = if let Some(connect_timeout) = timeouts.connect {
1173 tokio_timeout(connect_timeout, connect_fut)
1174 .await
1175 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1176 } else {
1177 connect_fut.await?
1178 };
1179
1180 let alpn = stream.alpn_protocol();
1181 if !alpn.is_h2() {
1182 return Err(Error::HttpProtocol(format!(
1183 "Expected h2 ALPN, got {:?}",
1184 alpn
1185 )));
1186 }
1187
1188 let h2_connect_fut = H2Connection::connect(
1189 stream,
1190 client.http2_settings.clone(),
1191 client.pseudo_order,
1192 );
1193 let h2_conn = if let Some(connect_timeout) = timeouts.connect {
1194 tokio_timeout(connect_timeout, h2_connect_fut)
1195 .await
1196 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1197 } else {
1198 h2_connect_fut.await?
1199 };
1200
1201 let pooled_conn = H2PooledConnection::new_with_config(
1202 h2_conn,
1203 client.h2_transport_config.clone(),
1204 );
1205 {
1206 let mut pool = client.h2_pool.write().await;
1207 pool.insert(pool_key.clone(), pooled_conn.clone());
1208 }
1209
1210 let send_fut = pooled_conn.send_streaming_request(
1211 request.method.clone(),
1212 &uri,
1213 &request.headers,
1214 request.body.clone(),
1215 body_timeouts,
1216 );
1217 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
1218 tokio_timeout(ttfb_timeout, send_fut)
1219 .await
1220 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
1221 } else {
1222 send_fut.await?
1223 };
1224
1225 let response = response.with_url(request_url.clone());
1226 client
1227 .store_cookies_from_headers(response.headers(), request_url.as_str())
1228 .await;
1229 response
1230 }
1231 }
1232 } else if client.h2_direct_streaming_responses && request.body.is_empty() {
1233 let response = client
1234 .send_h2_direct_streaming_response(
1235 request.method.clone(),
1236 &uri,
1237 request.headers.clone(),
1238 &pool_key,
1239 &timeouts,
1240 body_timeouts,
1241 )
1242 .await?;
1243
1244 client
1245 .store_cookies_from_headers(response.headers(), request_url.as_str())
1246 .await;
1247 response.with_url(request_url)
1248 } else {
1249 let connector = client.connector_for_uri(&uri);
1250 let connect_fut = connector.connect(&uri);
1251 let stream = if let Some(connect_timeout) = timeouts.connect {
1252 tokio_timeout(connect_timeout, connect_fut)
1253 .await
1254 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1255 } else {
1256 connect_fut.await?
1257 };
1258
1259 let alpn = stream.alpn_protocol();
1260 if !alpn.is_h2() {
1261 return Err(Error::HttpProtocol(format!(
1262 "Expected h2 ALPN, got {:?}",
1263 alpn
1264 )));
1265 }
1266
1267 let h2_connect_fut = H2Connection::connect(
1268 stream,
1269 client.http2_settings.clone(),
1270 client.pseudo_order,
1271 );
1272 let h2_conn = if let Some(connect_timeout) = timeouts.connect {
1273 tokio_timeout(connect_timeout, h2_connect_fut)
1274 .await
1275 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1276 } else {
1277 h2_connect_fut.await?
1278 };
1279
1280 let pooled_conn = H2PooledConnection::new_with_config(
1281 h2_conn,
1282 client.h2_transport_config.clone(),
1283 );
1284 {
1285 let mut pool = client.h2_pool.write().await;
1286 pool.insert(pool_key.clone(), pooled_conn.clone());
1287 }
1288
1289 let body = std::mem::take(&mut request.body);
1290
1291 let send_fut = pooled_conn.send_streaming_request(
1292 request.method.clone(),
1293 &uri,
1294 &request.headers,
1295 body,
1296 body_timeouts,
1297 );
1298 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
1299 tokio_timeout(ttfb_timeout, send_fut)
1300 .await
1301 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
1302 } else {
1303 send_fut.await?
1304 };
1305
1306 client
1307 .store_cookies_from_headers(response.headers(), request_url.as_str())
1308 .await;
1309 response.with_url(request_url)
1310 }
1311 };
1312
1313 reject_compressed_streaming(&response)?;
1314 Ok(response)
1315 }
1316}
1317
1318fn reject_compressed_streaming(response: &Response) -> Result<()> {
1319 if let Some(enc) = response.content_encoding() {
1320 let enc_lc = enc.to_lowercase();
1321 if enc_lc.contains("gzip")
1322 || enc_lc.contains("deflate")
1323 || enc_lc.contains("br")
1324 || enc_lc.contains("zstd")
1325 {
1326 return Err(Error::Decompression(
1327 "Compressed streaming is unsupported".into(),
1328 ));
1329 }
1330 }
1331 Ok(())
1332}
1333
1334async fn drain_streaming_body(body: &mut Body) -> Result<()> {
1335 while let Some(frame) = body.frame().await {
1336 let _ = frame?;
1337 }
1338 Ok(())
1339}
1340
1341impl Client {
1342 pub async fn execute(&self, mut request: Request) -> Result<Response> {
1344 if request.body.is_streaming() {
1345 return Err(Error::HttpProtocol(
1346 "streaming request bodies require send_streaming()".into(),
1347 ));
1348 }
1349
1350 let policy = self.redirect_policy.clone();
1351 let mut redirects = 0u32;
1352
1353 loop {
1354 let mut headers = request.headers.clone();
1355 let cookie_injected = self
1356 .apply_cookie_header_for_url(request.url.as_str(), &mut headers)
1357 .await;
1358 request.headers = headers;
1359
1360 let mut timeouts = self.timeouts.clone();
1361 if let Some(total) = request.timeout {
1362 timeouts.total = Some(total);
1363 }
1364
1365 let response = self
1366 .execute_once(request.clone(), &timeouts)
1367 .await?
1368 .into_buffered()
1369 .await?;
1370
1371 self.store_cookies_from_headers(response.headers(), request.url.as_str())
1372 .await;
1373
1374 if matches!(policy, RedirectPolicy::None) || !response.is_redirect() {
1375 return Ok(response);
1376 }
1377
1378 let location = match response.redirect_url() {
1379 Some(value) => value.to_string(),
1380 None => return Ok(response),
1381 };
1382
1383 if let RedirectPolicy::Limited(limit) = policy {
1384 if redirects >= limit {
1385 return Err(Error::RedirectLimit { count: limit });
1386 }
1387 }
1388
1389 let next_url = request.url.join(&location).map_err(Error::from)?;
1390 let mut next_request = self.redirect_request(&request, &response, next_url)?;
1391
1392 if cookie_injected {
1393 next_request.headers.remove("cookie");
1394 }
1395
1396 request = next_request;
1397 redirects += 1;
1398 }
1399 }
1400
1401 async fn execute_once(&self, request: Request, timeouts: &Timeouts) -> Result<Response> {
1402 let version = request.version.unwrap_or(self.default_version);
1403
1404 if matches!(version, HttpVersion::Http3Only) {
1406 return self
1407 .send_h3_for_url(request.clone(), request.url.clone(), timeouts)
1408 .await;
1409 }
1410
1411 if matches!(version, HttpVersion::Http3) {
1413 match self
1414 .send_h3_for_url(request.clone(), request.url.clone(), timeouts)
1415 .await
1416 {
1417 Ok(response) => return Ok(response),
1418 Err(e) => {
1419 tracing::debug!("HTTP/3 failed, falling back to HTTP/1.1 or HTTP/2: {}", e);
1420 }
1422 }
1423 }
1424
1425 if matches!(version, HttpVersion::Auto) && self.h3_upgrade_enabled {
1427 let origin = Self::origin_for_url(&request.url);
1428 if let Some(alt_svc) = self.alt_svc_cache.get_h3_alternative(&origin).await {
1429 tracing::debug!(
1430 "Alt-Svc indicates HTTP/3 support for {}, attempting upgrade",
1431 origin
1432 );
1433
1434 let mut h3_url = request.url.clone();
1435 let _ = h3_url.set_scheme("https");
1436 if let Some(ref host) = alt_svc.host {
1437 h3_url
1438 .set_host(Some(host))
1439 .map_err(|_| Error::HttpProtocol("Invalid Alt-Svc host".into()))?;
1440 }
1441 let _ = h3_url.set_port(Some(alt_svc.port));
1442
1443 match self
1444 .send_h3_for_url(request.clone(), h3_url.clone(), timeouts)
1445 .await
1446 {
1447 Ok(response) => return Ok(response.with_url(h3_url)),
1448 Err(e) => {
1449 tracing::debug!("HTTP/3 upgrade failed, using HTTP/1.1 or HTTP/2: {}", e);
1450 }
1452 }
1453 }
1454 }
1455
1456 self.send_h1_h2(request, version, timeouts).await
1458 }
1459
1460 async fn send_h3_for_url(
1461 &self,
1462 request: Request,
1463 url: Url,
1464 timeouts: &Timeouts,
1465 ) -> Result<Response> {
1466 let body = if request.body.is_empty() {
1467 None
1468 } else {
1469 Some(request.body.clone().into_bytes()?.to_vec())
1470 };
1471
1472 let fut = self.h3_client.send_request(
1473 url.as_str(),
1474 request.method.as_str(),
1475 &request.headers,
1476 body,
1477 );
1478
1479 let response = if let Some(total_timeout) = timeouts.total {
1481 tokio_timeout(total_timeout, fut)
1482 .await
1483 .map_err(|_| Error::TotalTimeout(total_timeout))??
1484 } else {
1485 fut.await?
1486 };
1487
1488 Ok(response.with_url(url))
1489 }
1490
1491 async fn send_h1_h2(
1492 &self,
1493 request: Request,
1494 version: HttpVersion,
1495 timeouts: &Timeouts,
1496 ) -> Result<Response> {
1497 let request_url = request.url.clone();
1499
1500 let uri: Uri = request
1502 .url
1503 .as_str()
1504 .parse()
1505 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
1506
1507 let prefer_http2 = match version {
1509 HttpVersion::Http1_1 => false,
1510 HttpVersion::Http2 => true,
1511 HttpVersion::Http3 | HttpVersion::Http3Only => {
1512 return Err(Error::HttpProtocol("HTTP/3 should use send_h3".into()));
1513 }
1514 HttpVersion::Auto => matches!(self.default_version, HttpVersion::Http2),
1515 };
1516
1517 let h3_upgrade_enabled = self.h3_upgrade_enabled;
1519 let alt_svc_cache = self.alt_svc_cache.clone();
1520 let origin = Self::origin_for_url(&request.url);
1521
1522 let headers_vec = request.headers.clone();
1523 let body_bytes = if request.body.is_empty() {
1524 None
1525 } else {
1526 Some(request.body.clone().into_bytes()?)
1527 };
1528
1529 if prefer_http2 {
1531 let pool_key = self.make_pool_key(&uri);
1532
1533 let pooled = {
1535 let mut pool = self.h2_pool.write().await;
1536 if let Some(conn) = pool.get(&pool_key) {
1537 if conn.is_alive() {
1538 Some(conn.clone())
1539 } else {
1540 pool.remove(&pool_key);
1541 None
1542 }
1543 } else {
1544 None
1545 }
1546 };
1547
1548 if let Some(conn) = pooled {
1549 self.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1550 let result = conn
1552 .send_request(
1553 request.method.clone(),
1554 &uri,
1555 &headers_vec,
1556 body_bytes.clone(),
1557 )
1558 .await;
1559
1560 match result {
1561 Ok(response) => {
1562 if h3_upgrade_enabled {
1564 if let Some(alt_svc) = response.get_header("alt-svc") {
1565 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
1566 }
1567 }
1568 return Ok(response.with_url(request_url));
1569 }
1570 Err(e) => {
1571 tracing::debug!("Pooled HTTP/2 connection failed, creating new: {}", e);
1573 let mut pool = self.h2_pool.write().await;
1574 pool.remove(&pool_key);
1575 }
1576 }
1577 }
1578
1579 let connector = self.connector_for_uri(&uri);
1582 let connect_fut = connector.connect(&uri);
1583 let stream = if let Some(connect_timeout) = timeouts.connect {
1584 tokio_timeout(connect_timeout, connect_fut)
1585 .await
1586 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1587 } else {
1588 connect_fut.await?
1589 };
1590
1591 let use_http2 = if self.http2_prior_knowledge && !stream.alpn_protocol().is_h2() {
1593 true
1595 } else if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
1596 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
1597 } else {
1598 false
1599 };
1600
1601 if use_http2 {
1602 let h2_conn =
1604 H2Connection::connect(stream, self.http2_settings.clone(), self.pseudo_order)
1605 .await?;
1606 let pooled_conn =
1607 H2PooledConnection::new_with_config(h2_conn, self.h2_transport_config.clone());
1608
1609 {
1611 let mut pool = self.h2_pool.write().await;
1612 pool.insert(pool_key, pooled_conn.clone());
1613 }
1614
1615 let fut = pooled_conn.send_request(
1617 request.method.clone(),
1618 &uri,
1619 &headers_vec,
1620 body_bytes.clone(),
1621 );
1622
1623 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
1624 tokio_timeout(ttfb_timeout, fut)
1625 .await
1626 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1627 } else {
1628 fut.await
1629 }?;
1630
1631 if h3_upgrade_enabled {
1633 if let Some(alt_svc) = response.get_header("alt-svc") {
1634 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
1635 }
1636 }
1637
1638 return Ok(response.with_url(request_url));
1639 }
1640 }
1642
1643 let pool_key = self.make_pool_key(&uri);
1645 let h1_slot = self.acquire_h1_connection_slot(&pool_key, timeouts).await?;
1646
1647 let mut stream_opt = self.h1_pool.get_h1(&pool_key).await;
1649 let mut used_pooled = stream_opt.is_some();
1650 if used_pooled {
1651 self.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1652 }
1653
1654 let mut stream = if let Some(pooled_stream) = stream_opt.take() {
1656 tracing::debug!("H1: Reusing pooled connection for {:?}", pool_key);
1657 pooled_stream
1658 } else {
1659 tracing::debug!("H1: Creating new connection for {:?}", pool_key);
1660 let connector = self.connector_for_uri(&uri);
1662 let connect_fut = connector.connect(&uri);
1663 if let Some(connect_timeout) = timeouts.connect {
1664 tokio_timeout(connect_timeout, connect_fut)
1665 .await
1666 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1667 } else {
1668 connect_fut.await?
1669 }
1670 };
1671
1672 let server_wants_h2 = if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
1675 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
1676 } else {
1677 false
1678 };
1679
1680 let response = if server_wants_h2 {
1681 drop(h1_slot);
1682 tracing::debug!("Server selected h2 via ALPN, upgrading to HTTP/2");
1684
1685 let h2_conn =
1686 H2Connection::connect(stream, self.http2_settings.clone(), self.pseudo_order)
1687 .await?;
1688 let pooled_conn =
1689 H2PooledConnection::new_with_config(h2_conn, self.h2_transport_config.clone());
1690
1691 {
1693 let mut pool = self.h2_pool.write().await;
1694 pool.insert(pool_key, pooled_conn.clone());
1695 }
1696
1697 let fut = pooled_conn.send_request(
1699 request.method.clone(),
1700 &uri,
1701 &headers_vec,
1702 body_bytes.clone(),
1703 );
1704
1705 if let Some(ttfb_timeout) = timeouts.ttfb {
1706 tokio_timeout(ttfb_timeout, fut)
1707 .await
1708 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1709 } else {
1710 fut.await
1711 }?
1712 } else {
1713 let _h1_slot = h1_slot;
1714 let result = loop {
1718 let stream_for_request = stream;
1719 let body_bytes = body_bytes.clone();
1720 let fut = Self::do_send_http1(
1721 stream_for_request,
1722 request.method.clone(),
1723 &uri,
1724 request.headers.clone(),
1725 body_bytes.clone(),
1726 );
1727
1728 let request_result = if let Some(ttfb_timeout) = timeouts.ttfb {
1730 tokio_timeout(ttfb_timeout, fut)
1731 .await
1732 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1733 } else {
1734 fut.await
1735 };
1736
1737 match request_result {
1738 Ok((resp, returned_stream)) => {
1739 self.h1_pool.put_h1(pool_key.clone(), returned_stream).await;
1741 break Ok(resp);
1742 }
1743 Err(e) => {
1744 if used_pooled {
1746 tracing::debug!(
1747 "H1: Pooled connection failed for {:?}, creating new: {}",
1748 pool_key,
1749 e
1750 );
1751 let connector = self.connector_for_uri(&uri);
1753 let connect_fut = connector.connect(&uri);
1754 stream = if let Some(connect_timeout) = timeouts.connect {
1755 tokio_timeout(connect_timeout, connect_fut)
1756 .await
1757 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1758 } else {
1759 connect_fut.await?
1760 };
1761 used_pooled = false; continue;
1763 } else {
1764 tracing::debug!(
1766 "H1: Request failed for {:?}, discarding connection: {}",
1767 pool_key,
1768 e
1769 );
1770 break Err(e);
1771 }
1772 }
1773 }
1774 };
1775
1776 result?
1777 };
1778
1779 if h3_upgrade_enabled {
1781 if let Some(alt_svc) = response.get_header("alt-svc") {
1782 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
1783 }
1784 }
1785
1786 Ok(response.with_url(request_url))
1787 }
1788
1789 fn redirect_request(
1790 &self,
1791 request: &Request,
1792 response: &Response,
1793 next_url: Url,
1794 ) -> Result<Request> {
1795 let status = response.status().as_u16();
1796 let mut method = request.method.clone();
1797 let mut headers = request.headers.clone();
1798
1799 let should_switch = status == 303
1800 || ((status == 301 || status == 302) && !matches!(method, Method::GET | Method::HEAD));
1801
1802 let body = if should_switch {
1803 method = Method::GET;
1804 headers.remove("content-length");
1805 headers.remove("content-type");
1806 RequestBody::Empty
1807 } else if request.body.is_streaming() {
1808 return Err(Error::HttpProtocol(
1809 "redirect would require replaying a non-replayable streaming request body".into(),
1810 ));
1811 } else {
1812 request.body.clone()
1813 };
1814
1815 if Self::is_cross_origin(&request.url, &next_url) {
1816 headers.remove("authorization");
1817 }
1818
1819 Ok(Request {
1820 method,
1821 url: next_url,
1822 headers,
1823 body,
1824 version: request.version,
1825 timeout: request.timeout,
1826 })
1827 }
1828
1829 async fn apply_cookie_header_for_url(&self, request_url: &str, headers: &mut Headers) -> bool {
1830 let Some(jar) = &self.cookie_store else {
1831 return false;
1832 };
1833 if headers.contains("cookie") {
1834 return false;
1835 }
1836
1837 let cookie_header = jar.read().await.build_cookie_header(request_url);
1838 if let Some(cookie_header) = cookie_header {
1839 headers.insert("Cookie", cookie_header);
1840 return true;
1841 }
1842 false
1843 }
1844
1845 async fn store_cookies_from_headers(&self, headers: &Headers, request_url: &str) {
1846 if let Some(jar) = &self.cookie_store {
1847 jar.write().await.store_from_headers(headers, request_url);
1848 }
1849 }
1850
1851 fn make_pool_key(&self, uri: &Uri) -> PoolKey {
1853 let host = uri.host().unwrap_or("localhost").to_string();
1854 let is_https = uri.scheme_str() == Some("https");
1855 let port = uri.port_u16().unwrap_or(if is_https { 443 } else { 80 });
1856 PoolKey::new(host, port, is_https, self.fingerprint, self.pseudo_order)
1857 }
1858
1859 fn take_h2_direct_connection(
1860 &self,
1861 pool_key: &PoolKey,
1862 ) -> Option<RawH2Connection<MaybeHttpsStream>> {
1863 let mut pool = self
1864 .h2_direct_pool
1865 .lock()
1866 .expect("H2 direct pool mutex poisoned");
1867 let conn = pool.get_mut(pool_key).and_then(Vec::pop);
1868 if pool.get(pool_key).is_some_and(Vec::is_empty) {
1869 pool.remove(pool_key);
1870 }
1871 conn
1872 }
1873
1874 fn h2_direct_reuse_hook(&self, pool_key: PoolKey) -> H2DirectReuseHook {
1875 let pool = self.h2_direct_pool.clone();
1876 Box::new(move |conn| {
1877 if !conn.is_reusable() {
1878 return;
1879 }
1880 let mut guard = pool.lock().expect("H2 direct pool mutex poisoned");
1881 let entry = guard.entry(pool_key).or_default();
1882 if entry.is_empty() {
1883 entry.push(conn);
1884 }
1885 })
1886 }
1887
1888 async fn connect_h2_direct_connection(
1889 &self,
1890 uri: &Uri,
1891 timeouts: &Timeouts,
1892 ) -> Result<RawH2Connection<MaybeHttpsStream>> {
1893 let connector = self.connector_for_uri(uri);
1894 let connect_fut = connector.connect(uri);
1895 let stream = if let Some(connect_timeout) = timeouts.connect {
1896 tokio_timeout(connect_timeout, connect_fut)
1897 .await
1898 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1899 } else {
1900 connect_fut.await?
1901 };
1902
1903 let use_http2 = if self.http2_prior_knowledge && !stream.alpn_protocol().is_h2() {
1904 true
1905 } else if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
1906 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
1907 } else {
1908 false
1909 };
1910
1911 if !use_http2 {
1912 return Err(Error::HttpProtocol(format!(
1913 "Expected h2 ALPN, got {:?}",
1914 stream.alpn_protocol()
1915 )));
1916 }
1917
1918 let h2_connect_fut =
1919 RawH2Connection::connect(stream, self.http2_settings.clone(), self.pseudo_order);
1920 if let Some(connect_timeout) = timeouts.connect {
1921 tokio_timeout(connect_timeout, h2_connect_fut)
1922 .await
1923 .map_err(|_| Error::ConnectTimeout(connect_timeout))?
1924 } else {
1925 h2_connect_fut.await
1926 }
1927 }
1928
1929 async fn start_h2_direct_response(&self, request: H2DirectResponseRequest) -> Result<Response> {
1930 let H2DirectResponseRequest {
1931 conn,
1932 method,
1933 uri,
1934 headers,
1935 body_timeouts,
1936 pool_key,
1937 ttfb_timeout,
1938 } = request;
1939 let fut = async move {
1940 let mut conn = conn;
1941 let stream_id = conn.send_headers_raw(&method, &uri, &headers, true).await?;
1942 let (status, headers, end_stream) = conn
1943 .read_response_headers_with_end_stream(stream_id)
1944 .await?;
1945 Ok::<_, Error>(H2DirectStart {
1946 conn,
1947 stream_id,
1948 status: status.as_u16(),
1949 headers,
1950 end_stream,
1951 })
1952 };
1953
1954 let mut started = if let Some(timeout) = ttfb_timeout {
1955 tokio_timeout(timeout, fut)
1956 .await
1957 .map_err(|_| Error::TtfbTimeout(timeout))??
1958 } else {
1959 fut.await?
1960 };
1961
1962 if started.end_stream {
1963 started.conn.remove_stream(started.stream_id);
1964 let on_reusable = self.h2_direct_reuse_hook(pool_key);
1965 on_reusable(started.conn);
1966 return Ok(Response::with_body(
1967 started.status,
1968 Headers::from(started.headers),
1969 Body::empty(),
1970 "HTTP/2".to_string(),
1971 ));
1972 }
1973
1974 let on_reusable = self.h2_direct_reuse_hook(pool_key);
1975 Ok(Response::with_body(
1976 started.status,
1977 Headers::from(started.headers),
1978 Body::from_h2_direct(H2DirectBody::new(
1979 started.conn,
1980 started.stream_id,
1981 body_timeouts,
1982 on_reusable,
1983 )),
1984 "HTTP/2".to_string(),
1985 ))
1986 }
1987
1988 async fn send_h2_direct_streaming_response(
1989 &self,
1990 method: Method,
1991 uri: &Uri,
1992 headers: Headers,
1993 pool_key: &PoolKey,
1994 timeouts: &Timeouts,
1995 body_timeouts: H2BodyTimeouts,
1996 ) -> Result<Response> {
1997 if let Some(conn) = self.take_h2_direct_connection(pool_key) {
1998 self.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1999 match self
2000 .start_h2_direct_response(H2DirectResponseRequest {
2001 conn,
2002 method: method.clone(),
2003 uri: uri.clone(),
2004 headers: headers.clone(),
2005 body_timeouts,
2006 pool_key: pool_key.clone(),
2007 ttfb_timeout: timeouts.ttfb,
2008 })
2009 .await
2010 {
2011 Ok(response) => return Ok(response),
2012 Err(error) => {
2013 tracing::debug!(
2014 "Pooled direct HTTP/2 streaming connection failed, reconnecting: {}",
2015 error
2016 );
2017 }
2018 }
2019 }
2020
2021 let conn = self.connect_h2_direct_connection(uri, timeouts).await?;
2022 self.start_h2_direct_response(H2DirectResponseRequest {
2023 conn,
2024 method,
2025 uri: uri.clone(),
2026 headers,
2027 body_timeouts,
2028 pool_key: pool_key.clone(),
2029 ttfb_timeout: timeouts.ttfb,
2030 })
2031 .await
2032 }
2033
2034 async fn do_send_http1(
2035 stream: MaybeHttpsStream,
2036 method: Method,
2037 uri: &Uri,
2038 headers: Headers,
2039 body: Option<Bytes>,
2040 ) -> Result<(Response, MaybeHttpsStream)> {
2041 let mut conn = H1Connection::new(stream);
2042 let response = conn.send_request(method, uri, &headers, body).await?;
2043 let stream = conn.into_inner();
2044 Ok((response, stream))
2045 }
2046
2047 fn origin_for_url(url: &Url) -> String {
2049 let scheme = url.scheme();
2050 let host = url.host_str().unwrap_or("localhost");
2051 let port = url
2052 .port_or_known_default()
2053 .unwrap_or(if scheme == "https" { 443 } else { 80 });
2054
2055 if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
2056 format!("{}://{}", scheme, host)
2057 } else {
2058 format!("{}://{}:{}", scheme, host, port)
2059 }
2060 }
2061
2062 fn is_cross_origin(a: &Url, b: &Url) -> bool {
2063 a.scheme() != b.scheme()
2064 || a.host_str() != b.host_str()
2065 || a.port_or_known_default() != b.port_or_known_default()
2066 }
2067}
2068
2069impl ClientBuilder {
2070 pub fn new() -> Self {
2079 Self {
2080 fingerprint: FingerprintProfile::default(),
2081 http2_settings: None,
2082 pseudo_order: None,
2083 timeouts: Timeouts::default(),
2084 dns_config: DnsConfig::new(),
2085 pool_idle_timeout: Duration::from_secs(30),
2086 pool_max_idle_per_host: 6,
2087 h1_max_connections_per_origin: 6,
2088 h3_max_idle_timeout: None,
2089 h3_fingerprint: None,
2090 h3_backend: H3Backend::Native,
2091 h3_transport_config: H3TransportConfig::default(),
2092 h2_transport_config: H2TransportConfig::default(),
2093 h2_direct_streaming_responses: false,
2094 tcp_keepalive: None,
2095 tcp_keepalive_interval: None,
2096 tcp_keepalive_retries: None,
2097 tcp_fingerprint: None,
2098 prefer_http2: true, h3_upgrade_enabled: true, http2_prior_knowledge: false,
2101 root_certs: Vec::new(),
2102 use_platform_roots: false,
2103 danger_accept_invalid_certs: false,
2104 localhost_allows_invalid_certs: true, default_headers: Headers::new(),
2106 redirect_policy: RedirectPolicy::None,
2107 cookie_store: None,
2108 http_tls_early_data: false,
2109 }
2110 }
2111
2112 pub fn fingerprint(mut self, fingerprint: FingerprintProfile) -> Self {
2114 self.fingerprint = fingerprint;
2115 self
2116 }
2117
2118 pub fn http2_settings(mut self, settings: Http2Settings) -> Self {
2120 self.http2_settings = Some(settings);
2121 self
2122 }
2123
2124 pub fn pseudo_order(mut self, order: PseudoHeaderOrder) -> Self {
2126 self.pseudo_order = Some(order);
2127 self
2128 }
2129
2130 pub fn timeouts(mut self, timeouts: Timeouts) -> Self {
2134 self.timeouts = timeouts;
2135 self
2136 }
2137
2138 pub fn api_timeouts(mut self) -> Self {
2142 self.timeouts = Timeouts::api_defaults();
2143 self
2144 }
2145
2146 pub fn streaming_timeouts(mut self) -> Self {
2151 self.timeouts = Timeouts::streaming_defaults();
2152 self
2153 }
2154
2155 #[deprecated(
2160 since = "1.0.2",
2161 note = "Use `timeouts()` or `total_timeout()` instead"
2162 )]
2163 pub fn timeout(mut self, timeout: Duration) -> Self {
2164 self.timeouts.total = Some(timeout);
2165 self
2166 }
2167
2168 pub fn total_timeout(mut self, timeout: Duration) -> Self {
2170 self.timeouts.total = Some(timeout);
2171 self
2172 }
2173
2174 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
2176 self.timeouts.connect = Some(timeout);
2177 self
2178 }
2179
2180 pub fn ttfb_timeout(mut self, timeout: Duration) -> Self {
2182 self.timeouts.ttfb = Some(timeout);
2183 self
2184 }
2185
2186 pub fn read_timeout(mut self, timeout: Duration) -> Self {
2188 self.timeouts.read_idle = Some(timeout);
2189 self
2190 }
2191
2192 pub fn write_timeout(mut self, timeout: Duration) -> Self {
2194 self.timeouts.write_idle = Some(timeout);
2195 self
2196 }
2197
2198 pub fn pool_acquire_timeout(mut self, timeout: Duration) -> Self {
2200 self.timeouts.pool_acquire = Some(timeout);
2201 self
2202 }
2203
2204 pub fn pool_idle_timeout(mut self, timeout: Duration) -> Self {
2206 self.pool_idle_timeout = timeout;
2207 self
2208 }
2209
2210 pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
2212 self.pool_max_idle_per_host = max;
2213 self
2214 }
2215
2216 pub fn h1_max_connections_per_origin(mut self, max: usize) -> Self {
2222 self.h1_max_connections_per_origin = max;
2223 self
2224 }
2225
2226 pub fn h1_max_connections_per_host(self, max: usize) -> Self {
2228 self.h1_max_connections_per_origin(max)
2229 }
2230
2231 pub fn hickory_dns(mut self, enable: bool) -> Self {
2235 self.dns_config = self.dns_config.with_cache_enabled(enable);
2236 self
2237 }
2238
2239 pub fn trust_dns(self, enable: bool) -> Self {
2241 self.hickory_dns(enable)
2242 }
2243
2244 pub fn dns_cache_ttl(mut self, ttl: Duration) -> Self {
2246 self.dns_config = self.dns_config.with_cache_ttl(ttl);
2247 self
2248 }
2249
2250 pub fn resolve(self, domain: &str, addr: SocketAddr) -> Self {
2252 self.resolve_to_addrs(domain, &[addr])
2253 }
2254
2255 pub fn resolve_to_addrs(mut self, domain: &str, addrs: &[SocketAddr]) -> Self {
2257 self.dns_config = self.dns_config.with_override(domain, addrs.to_vec());
2258 self
2259 }
2260
2261 pub fn dns_resolver<R: Resolve + 'static>(mut self, resolver: Arc<R>) -> Self {
2263 self.dns_config = self.dns_config.with_resolver(resolver);
2264 self
2265 }
2266
2267 pub fn dns_resolver2<R: Resolve + 'static>(mut self, resolver: R) -> Self {
2269 self.dns_config = self.dns_config.with_resolver(Arc::new(resolver));
2270 self
2271 }
2272
2273 pub fn tcp_keepalive(mut self, val: Option<Duration>) -> Self {
2275 self.tcp_keepalive = val;
2276 self
2277 }
2278
2279 pub fn tcp_keepalive_interval(mut self, val: Option<Duration>) -> Self {
2281 self.tcp_keepalive_interval = val;
2282 self
2283 }
2284
2285 pub fn tcp_keepalive_retries(mut self, retries: Option<u32>) -> Self {
2287 self.tcp_keepalive_retries = retries;
2288 self
2289 }
2290
2291 pub fn with_tcp_notsent_lowat(mut self, bytes: u32) -> Self {
2296 let mut fp = self.tcp_fingerprint.take().unwrap_or_default();
2297 fp.tcp_notsent_lowat = Some(bytes);
2298 self.tcp_fingerprint = Some(fp);
2299 self
2300 }
2301
2302 pub fn http2_initial_stream_window_size(mut self, size: Option<u32>) -> Self {
2304 if let Some(size) = size {
2305 let mut settings = self
2306 .http2_settings
2307 .unwrap_or_else(|| self.fingerprint.http2_settings());
2308 settings.initial_window_size = size;
2309 self.http2_settings = Some(settings);
2310 }
2311 self
2312 }
2313
2314 pub fn http2_initial_connection_window_size(mut self, size: Option<u32>) -> Self {
2316 if let Some(size) = size {
2317 let mut settings = self
2318 .http2_settings
2319 .unwrap_or_else(|| self.fingerprint.http2_settings());
2320 settings.initial_window_update = size.saturating_sub(65_535);
2321 self.http2_settings = Some(settings);
2322 }
2323 self
2324 }
2325
2326 pub fn http2_adaptive_window(self, _enabled: bool) -> Self {
2329 self
2330 }
2331
2332 pub fn http2_keep_alive_interval(mut self, interval: Option<Duration>) -> Self {
2334 self.h2_transport_config.keep_alive_interval = interval;
2335 self
2336 }
2337
2338 pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
2340 self.h2_transport_config.keep_alive_timeout = timeout;
2341 self
2342 }
2343
2344 pub fn http2_keep_alive_while_idle(mut self, enabled: bool) -> Self {
2346 self.h2_transport_config.keep_alive_while_idle = enabled;
2347 self
2348 }
2349
2350 pub fn h2_max_concurrent_streams_per_connection(mut self, max: u32) -> Self {
2356 self.h2_transport_config
2357 .max_concurrent_streams_per_connection = (max > 0).then_some(max);
2358 self
2359 }
2360
2361 pub fn h2_max_streams_per_origin(self, max: u32) -> Self {
2363 self.h2_max_concurrent_streams_per_connection(max)
2364 }
2365
2366 pub fn h2_streaming_body_buffer_slots(mut self, slots: usize) -> Self {
2368 self.h2_transport_config.streaming_body_buffer_slots = slots.max(1);
2369 self
2370 }
2371
2372 pub fn h2_body_buffer_slots(self, slots: usize) -> Self {
2374 self.h2_streaming_body_buffer_slots(slots)
2375 }
2376
2377 pub fn capacity_policy(mut self, policy: CapacityPolicy) -> Self {
2379 self.h1_max_connections_per_origin = policy.max_pending_per_origin;
2380 self.h2_transport_config
2381 .max_concurrent_streams_per_connection =
2382 Some(policy.max_pending_per_origin.min(u32::MAX as usize) as u32);
2383 self.h2_transport_config.streaming_body_buffer_slots =
2384 policy.streaming_body_buffer_slots.max(1);
2385 self.h3_transport_config.streaming_body_buffer_slots =
2386 policy.streaming_body_buffer_slots.max(1);
2387 self.h3_transport_config.tunnel_outbound_byte_budget = policy
2388 .h3_tunnel_outbound_byte_budget
2389 .max(crate::transport::h3::MIN_H3_TUNNEL_OUTBOUND_BYTE_BUDGET);
2390 self.h3_transport_config.tunnel_inbound_byte_budget = policy
2391 .h3_tunnel_inbound_byte_budget
2392 .max(crate::transport::h3::MIN_H3_TUNNEL_INBOUND_BYTE_BUDGET);
2393 self
2394 }
2395
2396 pub fn h2_direct_streaming_responses(mut self, enabled: bool) -> Self {
2404 self.h2_direct_streaming_responses = enabled;
2405 self
2406 }
2407
2408 pub fn h3_max_idle_timeout(mut self, timeout_ms: u64) -> Self {
2410 self.h3_max_idle_timeout = Some(timeout_ms);
2411 self
2412 }
2413
2414 pub fn h3_fingerprint(mut self, fingerprint: Http3Fingerprint) -> Self {
2416 self.h3_fingerprint = Some(fingerprint);
2417 self
2418 }
2419
2420 fn update_h3_fingerprint(mut self, update: impl FnOnce(&mut Http3Fingerprint)) -> Self {
2421 let mut fingerprint = self
2422 .h3_fingerprint
2423 .take()
2424 .unwrap_or_else(|| self.fingerprint.http3_fingerprint());
2425 update(&mut fingerprint);
2426 self.h3_fingerprint = Some(fingerprint);
2427 self
2428 }
2429
2430 pub fn h3_initial_max_data(self, bytes: u64) -> Self {
2432 self.update_h3_fingerprint(|fingerprint| {
2433 fingerprint.transport.initial_max_data = bytes;
2434 })
2435 }
2436
2437 pub fn h3_initial_max_stream_data_bidi_local(self, bytes: u64) -> Self {
2439 self.update_h3_fingerprint(|fingerprint| {
2440 fingerprint.transport.initial_max_stream_data_bidi_local = bytes;
2441 })
2442 }
2443
2444 pub fn h3_initial_max_stream_data_bidi_remote(self, bytes: u64) -> Self {
2446 self.update_h3_fingerprint(|fingerprint| {
2447 fingerprint.transport.initial_max_stream_data_bidi_remote = bytes;
2448 })
2449 }
2450
2451 pub fn h3_initial_max_stream_data_uni(self, bytes: u64) -> Self {
2453 self.update_h3_fingerprint(|fingerprint| {
2454 fingerprint.transport.initial_max_stream_data_uni = bytes;
2455 })
2456 }
2457
2458 pub fn h3_initial_max_streams_bidi(self, streams: u64) -> Self {
2460 self.update_h3_fingerprint(|fingerprint| {
2461 fingerprint.transport.initial_max_streams_bidi = streams;
2462 })
2463 }
2464
2465 pub fn h3_initial_max_streams_uni(self, streams: u64) -> Self {
2467 self.update_h3_fingerprint(|fingerprint| {
2468 fingerprint.transport.initial_max_streams_uni = streams;
2469 })
2470 }
2471
2472 pub fn h3_max_connection_window(self, bytes: u64) -> Self {
2474 self.update_h3_fingerprint(|fingerprint| {
2475 fingerprint.transport.max_connection_window = bytes;
2476 })
2477 }
2478
2479 pub fn h3_max_stream_window(self, bytes: u64) -> Self {
2481 self.update_h3_fingerprint(|fingerprint| {
2482 fingerprint.transport.max_stream_window = bytes;
2483 })
2484 }
2485
2486 pub fn h3_streaming_body_buffer_slots(mut self, slots: usize) -> Self {
2488 self.h3_transport_config.streaming_body_buffer_slots = slots.max(1);
2489 self
2490 }
2491
2492 pub fn h3_body_buffer_slots(self, slots: usize) -> Self {
2494 self.h3_streaming_body_buffer_slots(slots)
2495 }
2496
2497 pub fn h3_tunnel_outbound_byte_budget(mut self, bytes: usize) -> Self {
2499 self.h3_transport_config.tunnel_outbound_byte_budget =
2500 bytes.max(crate::transport::h3::MIN_H3_TUNNEL_OUTBOUND_BYTE_BUDGET);
2501 self
2502 }
2503
2504 pub fn h3_tunnel_inbound_byte_budget(mut self, bytes: usize) -> Self {
2506 self.h3_transport_config.tunnel_inbound_byte_budget =
2507 bytes.max(crate::transport::h3::MIN_H3_TUNNEL_INBOUND_BYTE_BUDGET);
2508 self
2509 }
2510
2511 pub fn h3_backend(mut self, backend: H3Backend) -> Self {
2513 self.h3_backend = backend;
2514 self
2515 }
2516
2517 pub fn default_headers(mut self, headers: impl Into<Headers>) -> Self {
2519 self.default_headers = headers.into();
2520 self
2521 }
2522
2523 pub fn default_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
2525 self.default_headers.insert(name, value);
2526 self
2527 }
2528
2529 pub fn user_agent(mut self, value: impl Into<String>) -> Self {
2531 self.default_headers.insert("User-Agent", value.into());
2532 self
2533 }
2534
2535 pub fn redirect_policy(mut self, policy: RedirectPolicy) -> Self {
2537 self.redirect_policy = policy;
2538 self
2539 }
2540
2541 pub fn cookie_store(mut self, enabled: bool) -> Self {
2543 if enabled {
2544 self.cookie_store = Some(Arc::new(RwLock::new(CookieJar::new())));
2545 } else {
2546 self.cookie_store = None;
2547 }
2548 self
2549 }
2550
2551 pub fn cookie_jar(mut self, jar: Arc<RwLock<CookieJar>>) -> Self {
2553 self.cookie_store = Some(jar);
2554 self
2555 }
2556
2557 pub fn prefer_http2(mut self, prefer: bool) -> Self {
2559 self.prefer_http2 = prefer;
2560 self
2561 }
2562
2563 pub fn h3_upgrade(mut self, enabled: bool) -> Self {
2570 self.h3_upgrade_enabled = enabled;
2571 self
2572 }
2573
2574 pub fn http2_prior_knowledge(mut self, enabled: bool) -> Self {
2577 self.http2_prior_knowledge = enabled;
2578 if enabled {
2580 self.prefer_http2 = true;
2581 }
2582 self
2583 }
2584
2585 pub fn add_root_certificate(mut self, cert: Vec<u8>) -> Self {
2587 self.root_certs.push(cert);
2588 self
2589 }
2590
2591 pub fn with_platform_roots(mut self, enabled: bool) -> Self {
2602 self.use_platform_roots = enabled;
2603 self
2604 }
2605
2606 pub fn danger_accept_invalid_certs(mut self, accept: bool) -> Self {
2612 self.danger_accept_invalid_certs = accept;
2613 self
2614 }
2615
2616 pub fn localhost_allows_invalid_certs(mut self, allow: bool) -> Self {
2624 self.localhost_allows_invalid_certs = allow;
2625 self
2626 }
2627
2628 pub fn http_tls_early_data(mut self, enabled: bool) -> Self {
2630 self.http_tls_early_data = enabled;
2631 self
2632 }
2633
2634 pub fn build(self) -> Result<Client> {
2636 let session_cache = Arc::new(SessionCache::new());
2637 let tls_fingerprint = self.fingerprint.tls_fingerprint();
2639 let root_certs = self.root_certs.clone();
2640 let mut connector = BoringConnector::with_fingerprint(tls_fingerprint.clone())
2641 .with_shared_session_cache(session_cache.clone())
2642 .with_early_data(self.http_tls_early_data)
2643 .with_root_certificates(self.root_certs.clone())
2644 .with_platform_roots(self.use_platform_roots)
2645 .with_dns_config(self.dns_config.clone())
2646 .tcp_keepalive(self.tcp_keepalive)
2647 .tcp_keepalive_interval(self.tcp_keepalive_interval)
2648 .tcp_keepalive_retries(self.tcp_keepalive_retries);
2649
2650 if let Some(tcp_fp) = &self.tcp_fingerprint {
2651 connector = connector.with_tcp_fingerprint(tcp_fp.clone());
2652 }
2653
2654 if self.danger_accept_invalid_certs {
2656 connector = connector.danger_accept_invalid_certs(true);
2657 }
2658
2659 let mut insecure_connector = BoringConnector::with_fingerprint(tls_fingerprint.clone())
2661 .with_shared_session_cache(session_cache)
2662 .with_early_data(self.http_tls_early_data)
2663 .with_root_certificates(self.root_certs.clone())
2664 .with_platform_roots(self.use_platform_roots)
2665 .with_dns_config(self.dns_config.clone())
2666 .tcp_keepalive(self.tcp_keepalive)
2667 .tcp_keepalive_interval(self.tcp_keepalive_interval)
2668 .tcp_keepalive_retries(self.tcp_keepalive_retries)
2669 .danger_accept_invalid_certs(true);
2670
2671 if let Some(tcp_fp) = &self.tcp_fingerprint {
2672 insecure_connector = insecure_connector.with_tcp_fingerprint(tcp_fp.clone());
2673 }
2674
2675 let h3_fingerprint = self
2677 .h3_fingerprint
2678 .unwrap_or_else(|| self.fingerprint.http3_fingerprint());
2679 let mut h3_client = H3Client::with_fingerprint(tls_fingerprint)
2680 .with_http3_fingerprint(h3_fingerprint)
2681 .with_h3_backend(self.h3_backend)
2682 .with_transport_config(self.h3_transport_config)
2683 .with_root_certificates(root_certs)
2684 .with_platform_roots(self.use_platform_roots)
2685 .with_dns_config(self.dns_config.clone());
2686 if let Some(timeout_ms) = self.h3_max_idle_timeout {
2687 h3_client = h3_client.with_max_idle_timeout(timeout_ms);
2688 }
2689 if self.danger_accept_invalid_certs {
2690 h3_client = h3_client.danger_accept_invalid_certs(true);
2691 }
2692
2693 let http2_settings = self
2695 .http2_settings
2696 .unwrap_or_else(|| self.fingerprint.http2_settings());
2697 let pseudo_order = self
2698 .pseudo_order
2699 .unwrap_or_else(|| self.fingerprint.http2_pseudo_order());
2700
2701 let mut h2_transport_config = self.h2_transport_config.clone();
2702 if h2_transport_config.keep_alive_interval.is_none() {
2703 h2_transport_config.keep_alive_interval = http2_settings.ping_interval;
2704 h2_transport_config.keep_alive_while_idle = true;
2705 }
2706
2707 let default_version = if self.prefer_http2 {
2709 HttpVersion::Http2
2710 } else {
2711 HttpVersion::Http1_1
2712 };
2713
2714 let h1_pool = Arc::new(ConnectionPool::with_config(
2718 self.pool_idle_timeout,
2719 self.pool_max_idle_per_host,
2720 100,
2721 ));
2722
2723 let pool_reuse_counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
2727 let h3_client = h3_client.with_pool_reuse_counter(pool_reuse_counter.clone());
2728
2729 Ok(Client {
2730 connector,
2731 insecure_connector,
2732 h3_client,
2733 alt_svc_cache: Arc::new(AltSvcCache::new()),
2734 h2_pool: Arc::new(RwLock::new(HashMap::new())),
2735 h2_direct_pool: Arc::new(StdMutex::new(HashMap::new())),
2736 h1_pool,
2737 h1_connection_slots: Arc::new(RwLock::new(HashMap::new())),
2738 h1_max_connections_per_origin: self.h1_max_connections_per_origin,
2739 http2_settings,
2740 pseudo_order,
2741 default_version,
2742 timeouts: self.timeouts,
2743 h2_transport_config,
2744 h2_direct_streaming_responses: self.h2_direct_streaming_responses,
2745 h3_upgrade_enabled: self.h3_upgrade_enabled,
2746 http2_prior_knowledge: self.http2_prior_knowledge,
2747 danger_accept_invalid_certs: self.danger_accept_invalid_certs,
2748 localhost_allows_invalid_certs: self.localhost_allows_invalid_certs,
2749 default_headers: self.default_headers,
2750 redirect_policy: self.redirect_policy,
2751 cookie_store: self.cookie_store,
2752 fingerprint: self.fingerprint,
2753 http_tls_early_data: self.http_tls_early_data,
2754 pool_reuse_counter,
2755 })
2756 }
2757}
2758
2759impl Default for ClientBuilder {
2760 fn default() -> Self {
2761 Self::new()
2762 }
2763}
2764
2765impl Default for AltSvcCache {
2766 fn default() -> Self {
2767 Self::new()
2768 }
2769}