1use crate::url::Url;
11use base64::Engine;
12use bytes::Bytes;
13use http::{Method, Uri};
14use serde::Serialize;
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::{Arc, Mutex as StdMutex};
19use std::time::Duration;
20use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
21use tokio::time::timeout as tokio_timeout;
22
23use crate::cookie::CookieJar;
24use crate::error::{Error, Result};
25use crate::fingerprint::{http2::Http2Settings, FingerprintProfile, Http3Fingerprint};
26use crate::headers::Headers;
27use crate::pool::alt_svc::AltSvcCache;
28use crate::pool::multiplexer::{ConnectionPool, PoolKey};
29use crate::request::{IntoUrl, RedirectPolicy, Request, RequestBody};
30use crate::response::{Body, Response};
31use crate::timeouts::Timeouts;
32use crate::transport::connector::{AlpnMode, BoringConnector, EarlyDataOutcome, MaybeHttpsStream};
33use crate::transport::dns::{DnsConfig, Resolve};
34use crate::transport::h1::{h1_request_body_kind, H1Connection, H1StreamingOptions};
35use crate::transport::h2::{
36 H2BodyTimeouts, H2Connection, H2DirectBody, H2DirectReuseHook, H2PooledConnection,
37 H2TransportConfig, H2Tunnel, PseudoHeaderOrder, RawH2Connection,
38};
39use crate::transport::h3::{H3Backend, H3Client, H3TransportConfig, H3Tunnel};
40use crate::transport::is_zero_rtt_safe_request;
41use crate::transport::session::SessionCache;
42use crate::transport::tcp::TcpFingerprint;
43use crate::version::HttpVersion;
44use crate::websocket::{WebSocketBuilder, WebSocketClientParts};
45
46type H2DirectPool = Arc<StdMutex<HashMap<PoolKey, Vec<RawH2Connection<MaybeHttpsStream>>>>>;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub struct CapacityPolicy {
55 pub max_pending_per_origin: usize,
56 pub streaming_body_buffer_slots: usize,
57 pub h3_tunnel_outbound_byte_budget: usize,
58 pub h3_tunnel_inbound_byte_budget: usize,
59}
60
61impl CapacityPolicy {
62 pub fn bounded(max_pending_per_origin: usize) -> Self {
63 let normalized = max_pending_per_origin.max(1);
64 Self {
65 max_pending_per_origin: normalized,
66 streaming_body_buffer_slots: normalized,
67 h3_tunnel_outbound_byte_budget: H3TransportConfig::default()
68 .tunnel_outbound_byte_budget,
69 h3_tunnel_inbound_byte_budget: H3TransportConfig::default().tunnel_inbound_byte_budget,
70 }
71 }
72
73 pub fn with_streaming_body_buffer_slots(mut self, slots: usize) -> Self {
78 self.streaming_body_buffer_slots = slots.max(1);
79 self
80 }
81
82 pub fn with_h3_tunnel_byte_budget(mut self, bytes: usize) -> Self {
83 let bytes = bytes
84 .max(crate::transport::h3::MIN_H3_TUNNEL_OUTBOUND_BYTE_BUDGET)
85 .max(crate::transport::h3::MIN_H3_TUNNEL_INBOUND_BYTE_BUDGET);
86 self.h3_tunnel_outbound_byte_budget = bytes;
87 self.h3_tunnel_inbound_byte_budget = bytes;
88 self
89 }
90
91 pub fn with_h3_tunnel_outbound_byte_budget(mut self, bytes: usize) -> Self {
92 self.h3_tunnel_outbound_byte_budget =
93 bytes.max(crate::transport::h3::MIN_H3_TUNNEL_OUTBOUND_BYTE_BUDGET);
94 self
95 }
96
97 pub fn with_h3_tunnel_inbound_byte_budget(mut self, bytes: usize) -> Self {
98 self.h3_tunnel_inbound_byte_budget =
99 bytes.max(crate::transport::h3::MIN_H3_TUNNEL_INBOUND_BYTE_BUDGET);
100 self
101 }
102}
103
104struct H2DirectStart {
105 conn: RawH2Connection<MaybeHttpsStream>,
106 stream_id: u32,
107 status: u16,
108 headers: Vec<(String, String)>,
109 end_stream: bool,
110}
111
112struct H2DirectResponseRequest {
113 conn: RawH2Connection<MaybeHttpsStream>,
114 method: Method,
115 uri: Uri,
116 headers: Headers,
117 body_timeouts: H2BodyTimeouts,
118 pool_key: PoolKey,
119 ttfb_timeout: Option<Duration>,
120}
121
122#[derive(Clone)]
131pub struct Client {
132 connector: BoringConnector,
133 insecure_connector: BoringConnector,
135 h3_client: H3Client,
136 alt_svc_cache: Arc<AltSvcCache>,
137 h2_pool: Arc<RwLock<HashMap<PoolKey, H2PooledConnection>>>,
139 h2_direct_pool: H2DirectPool,
141 h1_pool: Arc<ConnectionPool>,
143 h1_connection_slots: Arc<RwLock<HashMap<PoolKey, Arc<Semaphore>>>>,
145 h1_max_connections_per_origin: usize,
146 http2_settings: Http2Settings,
147 pseudo_order: PseudoHeaderOrder,
148 default_version: HttpVersion,
149 timeouts: Timeouts,
151 h2_transport_config: H2TransportConfig,
153 h2_direct_streaming_responses: bool,
156 h3_upgrade_enabled: bool,
158 http2_prior_knowledge: bool,
160 danger_accept_invalid_certs: bool,
162 localhost_allows_invalid_certs: bool,
164 default_headers: Headers,
166 redirect_policy: RedirectPolicy,
168 cookie_store: Option<Arc<RwLock<CookieJar>>>,
170 fingerprint: FingerprintProfile,
172 http_tls_early_data: bool,
174 pool_reuse_counter: Arc<AtomicUsize>,
178}
179
180pub struct RequestBuilder<'a> {
182 client: &'a Client,
183 url: Option<Url>,
184 method: Method,
185 headers: Headers,
186 body: RequestBody,
187 version: Option<HttpVersion>,
188 timeout: Option<Duration>,
189 error: Option<Error>,
190}
191
192pub struct WebSocketH2Builder<'a> {
194 client: &'a Client,
195 url: Option<Url>,
196 headers: Headers,
197 error: Option<Error>,
198}
199
200pub struct WebSocketH3Builder<'a> {
202 client: &'a Client,
203 url: Option<Url>,
204 headers: Headers,
205 error: Option<Error>,
206}
207
208pub struct ClientBuilder {
210 fingerprint: FingerprintProfile,
211 http2_settings: Option<Http2Settings>,
212 pseudo_order: Option<PseudoHeaderOrder>,
213 timeouts: Timeouts,
214 dns_config: DnsConfig,
215 pool_idle_timeout: Duration,
216 pool_max_idle_per_host: usize,
217 h1_max_connections_per_origin: usize,
218 h3_max_idle_timeout: Option<u64>,
219 h3_fingerprint: Option<Http3Fingerprint>,
220 h3_backend: H3Backend,
221 h3_transport_config: H3TransportConfig,
222 h2_transport_config: H2TransportConfig,
223 h2_direct_streaming_responses: bool,
224 tcp_keepalive: Option<Duration>,
225 tcp_keepalive_interval: Option<Duration>,
226 tcp_keepalive_retries: Option<u32>,
227 tcp_fingerprint: Option<TcpFingerprint>,
228 prefer_http2: bool,
229 h3_upgrade_enabled: bool,
230 http2_prior_knowledge: bool,
231 root_certs: Vec<Vec<u8>>,
232 use_platform_roots: bool,
234 danger_accept_invalid_certs: bool,
236 localhost_allows_invalid_certs: bool,
238 default_headers: Headers,
240 redirect_policy: RedirectPolicy,
242 cookie_store: Option<Arc<RwLock<CookieJar>>>,
244 http_tls_early_data: bool,
246}
247
248impl Client {
249 pub fn new() -> Result<Self> {
251 ClientBuilder::new().build()
252 }
253
254 pub fn builder() -> ClientBuilder {
256 ClientBuilder::new()
257 }
258
259 pub fn connection_reuse_count(&self) -> usize {
263 self.pool_reuse_counter.load(Ordering::Relaxed)
264 }
265
266 pub fn get(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
268 RequestBuilder::new(self, Method::GET, url)
269 }
270
271 pub fn post(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
273 RequestBuilder::new(self, Method::POST, url)
274 }
275
276 pub fn put(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
278 RequestBuilder::new(self, Method::PUT, url)
279 }
280
281 pub fn delete(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
283 RequestBuilder::new(self, Method::DELETE, url)
284 }
285
286 pub fn head(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
288 RequestBuilder::new(self, Method::HEAD, url)
289 }
290
291 pub fn patch(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
293 RequestBuilder::new(self, Method::PATCH, url)
294 }
295
296 pub fn request(&self, method: Method, url: impl IntoUrl) -> RequestBuilder<'_> {
298 RequestBuilder::new(self, method, url)
299 }
300
301 pub fn websocket_h2(&self, url: impl IntoUrl) -> WebSocketH2Builder<'_> {
303 WebSocketH2Builder::new(self, url)
304 }
305
306 pub fn websocket_h3(&self, url: impl IntoUrl) -> WebSocketH3Builder<'_> {
308 WebSocketH3Builder::new(self, url)
309 }
310
311 pub fn websocket(&self, url: impl IntoUrl) -> WebSocketBuilder<'_> {
313 Client::websocket_with_parts(
314 WebSocketClientParts {
315 connector: &self.connector,
316 insecure_connector: &self.insecure_connector,
317 default_headers: &self.default_headers,
318 timeouts: &self.timeouts,
319 cookie_store: self.cookie_store.as_ref(),
320 danger_accept_invalid_certs: self.danger_accept_invalid_certs,
321 localhost_allows_invalid_certs: self.localhost_allows_invalid_certs,
322 },
323 url,
324 )
325 }
326
327 pub fn alt_svc_cache(&self) -> &Arc<AltSvcCache> {
329 &self.alt_svc_cache
330 }
331
332 pub fn h3_client(&self) -> &H3Client {
335 &self.h3_client
336 }
337
338 pub fn fingerprint_profile(&self) -> FingerprintProfile {
340 self.fingerprint
341 }
342
343 pub fn http2_settings(&self) -> &Http2Settings {
345 &self.http2_settings
346 }
347
348 pub fn pseudo_order(&self) -> PseudoHeaderOrder {
350 self.pseudo_order
351 }
352
353 pub fn default_headers(&self) -> &Headers {
355 &self.default_headers
356 }
357
358 pub fn h1_max_connections_per_origin(&self) -> usize {
360 self.h1_max_connections_per_origin
361 }
362
363 pub fn h2_max_concurrent_streams_per_connection(&self) -> Option<u32> {
365 self.h2_transport_config
366 .max_concurrent_streams_per_connection
367 }
368
369 pub fn h2_streaming_body_buffer_slots(&self) -> usize {
371 self.h2_transport_config.streaming_body_buffer_slots
372 }
373
374 pub fn http2_keep_alive_interval(&self) -> Option<Duration> {
376 self.h2_transport_config.keep_alive_interval
377 }
378
379 pub fn http2_keep_alive_while_idle(&self) -> bool {
381 self.h2_transport_config.keep_alive_while_idle
382 }
383
384 pub fn h3_streaming_body_buffer_slots(&self) -> usize {
386 self.h3_client.streaming_body_buffer_slots()
387 }
388
389 pub fn h3_tunnel_outbound_byte_budget(&self) -> usize {
391 self.h3_client.tunnel_outbound_byte_budget()
392 }
393
394 pub fn h3_tunnel_inbound_byte_budget(&self) -> usize {
396 self.h3_client.tunnel_inbound_byte_budget()
397 }
398
399 async fn acquire_h1_connection_slot(
400 &self,
401 key: &PoolKey,
402 timeouts: &Timeouts,
403 ) -> Result<Option<OwnedSemaphorePermit>> {
404 if self.h1_max_connections_per_origin == 0 {
405 return Ok(None);
406 }
407
408 let semaphore = {
409 let mut slots = self.h1_connection_slots.write().await;
410 slots
411 .entry(key.clone())
412 .or_insert_with(|| Arc::new(Semaphore::new(self.h1_max_connections_per_origin)))
413 .clone()
414 };
415
416 let acquire = semaphore.acquire_owned();
417 let permit = if let Some(pool_acquire_timeout) = timeouts.pool_acquire {
418 tokio_timeout(pool_acquire_timeout, acquire)
419 .await
420 .map_err(|_| Error::PoolAcquireTimeout(pool_acquire_timeout))?
421 } else {
422 acquire.await
423 }
424 .map_err(|_| Error::Connection("HTTP/1.1 connection scheduler closed".into()))?;
425
426 Ok(Some(permit))
427 }
428
429 fn is_localhost(host: &str) -> bool {
431 host == "localhost" || host == "127.0.0.1" || host == "::1"
432 }
433
434 fn connector_for_uri(&self, uri: &Uri) -> &BoringConnector {
436 if self.danger_accept_invalid_certs {
438 return &self.insecure_connector;
439 }
440
441 if self.localhost_allows_invalid_certs {
443 if let Some(host) = uri.host() {
444 if Self::is_localhost(host) {
445 return &self.insecure_connector;
446 }
447 }
448 }
449
450 &self.connector
451 }
452}
453
454impl<'a> WebSocketH2Builder<'a> {
455 fn new(client: &'a Client, url: impl IntoUrl) -> Self {
456 let mut error = None;
457 let url = match url.into_url() {
458 Ok(url) => Some(url),
459 Err(err) => {
460 error = Some(err);
461 None
462 }
463 };
464
465 Self {
466 client,
467 url,
468 headers: client.default_headers.clone(),
469 error,
470 }
471 }
472
473 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
475 self.headers.insert(key, value);
476 self
477 }
478
479 pub fn headers(mut self, headers: impl Into<Headers>) -> Self {
481 self.headers = headers.into();
482 self
483 }
484
485 pub async fn open(self) -> Result<H2Tunnel> {
487 if let Some(err) = self.error {
488 return Err(err);
489 }
490
491 let url = self.url.ok_or_else(|| Error::missing("websocket URL"))?;
492
493 let websocket_scheme = url.scheme();
494 let h2_scheme = match websocket_scheme {
495 "wss" => "https",
496 "ws" => {
497 if !self.client.http2_prior_knowledge {
498 return Err(Error::WebSocketUnsupported(
499 "ws:// RFC 8441 requires explicit HTTP/2 prior knowledge".into(),
500 ));
501 }
502 "http"
503 }
504 other => {
505 return Err(Error::WebSocketUnsupported(format!(
506 "RFC 8441 requires ws:// or wss:// URL, got {other}"
507 )));
508 }
509 };
510
511 let mut h2_url = url.clone();
512 h2_url
513 .set_scheme(h2_scheme)
514 .map_err(|_| Error::WebSocketUnsupported("invalid WebSocket URL scheme".into()))?;
515
516 let uri: Uri = h2_url
517 .as_str()
518 .parse()
519 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
520
521 let headers = self.headers.clone();
522 let pool_key = self.client.make_pool_key(&uri);
523
524 if let Some(conn) = {
525 let pool = self.client.h2_pool.read().await;
526 pool.get(&pool_key).cloned()
527 } {
528 match conn.open_websocket_tunnel(uri.clone(), &headers).await {
529 Ok(tunnel) => return Ok(tunnel),
530 Err(err) => {
531 tracing::debug!("Pooled RFC 8441 tunnel open failed, reconnecting: {}", err);
532 let mut pool = self.client.h2_pool.write().await;
533 pool.remove(&pool_key);
534 }
535 }
536 }
537
538 let connector = self.client.connector_for_uri(&uri);
539 let stream = connector.connect(&uri).await?;
540
541 let use_http2 = if websocket_scheme == "ws" && self.client.http2_prior_knowledge {
542 true
543 } else if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
544 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
545 } else {
546 false
547 };
548
549 if !use_http2 {
550 return Err(Error::WebSocketUnsupported(
551 "RFC 8441 WebSocket requires ALPN h2 or explicit HTTP/2 prior knowledge".into(),
552 ));
553 }
554
555 let h2_conn = H2Connection::connect(
556 stream,
557 self.client.http2_settings.clone(),
558 self.client.pseudo_order,
559 )
560 .await?;
561 let pooled_conn =
562 H2PooledConnection::new_with_config(h2_conn, self.client.h2_transport_config.clone());
563
564 {
565 let mut pool = self.client.h2_pool.write().await;
566 pool.insert(pool_key, pooled_conn.clone());
567 }
568
569 pooled_conn.open_websocket_tunnel(uri, &headers).await
570 }
571}
572
573impl<'a> WebSocketH3Builder<'a> {
574 fn new(client: &'a Client, url: impl IntoUrl) -> Self {
575 let mut error = None;
576 let url = match url.into_url() {
577 Ok(url) => Some(url),
578 Err(err) => {
579 error = Some(err);
580 None
581 }
582 };
583
584 Self {
585 client,
586 url,
587 headers: client.default_headers.clone(),
588 error,
589 }
590 }
591
592 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
594 self.headers.insert(key, value);
595 self
596 }
597
598 pub fn headers(mut self, headers: impl Into<Headers>) -> Self {
600 self.headers = headers.into();
601 self
602 }
603
604 pub async fn open(self) -> Result<H3Tunnel> {
606 if let Some(err) = self.error {
607 return Err(err);
608 }
609
610 let url = self.url.ok_or_else(|| Error::missing("websocket URL"))?;
611 if url.scheme() != "wss" {
612 return Err(Error::WebSocketUnsupported(
613 "RFC 9220 WebSocket over HTTP/3 requires wss://".into(),
614 ));
615 }
616
617 let mut h3_url = url.clone();
618 h3_url
619 .set_scheme("https")
620 .map_err(|_| Error::WebSocketUnsupported("invalid WebSocket URL scheme".into()))?;
621
622 let mut h3_client = self.client.h3_client.clone();
623 if self.client.danger_accept_invalid_certs
624 || (self.client.localhost_allows_invalid_certs
625 && h3_url.host_str().is_some_and(Client::is_localhost))
626 {
627 h3_client = h3_client.danger_accept_invalid_certs(true);
628 }
629
630 let fut = h3_client.open_websocket_tunnel(h3_url.as_str(), &self.headers);
631 if let Some(total_timeout) = self.client.timeouts.total {
632 tokio_timeout(total_timeout, fut)
633 .await
634 .map_err(|_| Error::TotalTimeout(total_timeout))?
635 } else {
636 fut.await
637 }
638 }
639}
640
641impl<'a> RequestBuilder<'a> {
642 fn new(client: &'a Client, method: Method, url: impl IntoUrl) -> Self {
643 let mut error = None;
644 let url = match url.into_url() {
645 Ok(url) => Some(url),
646 Err(err) => {
647 error = Some(err);
648 None
649 }
650 };
651
652 Self {
653 client,
654 url,
655 method,
656 headers: client.default_headers.clone(),
657 body: RequestBody::Empty,
658 version: None,
659 timeout: None,
660 error,
661 }
662 }
663
664 fn set_error(&mut self, error: Error) {
665 if self.error.is_none() {
666 self.error = Some(error);
667 }
668 }
669
670 fn ensure_content_type(&mut self, value: &str) {
671 if !self.headers.contains("content-type") {
672 self.headers.insert("Content-Type", value.to_string());
673 }
674 }
675
676 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
678 self.headers.insert(key, value);
679 self
680 }
681
682 pub fn header_append(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
684 self.headers.append(key, value);
685 self
686 }
687
688 pub fn headers(mut self, headers: impl Into<Headers>) -> Self {
690 self.headers = headers.into();
691 self
692 }
693
694 pub fn body(mut self, body: impl Into<RequestBody>) -> Self {
698 self.body = body.into();
699 self
700 }
701
702 pub fn body_stream<S>(mut self, stream: S) -> Self
706 where
707 S: futures_core::Stream<Item = std::result::Result<Bytes, Error>> + Send + 'static,
708 {
709 self.body = RequestBody::Stream {
710 stream: Box::pin(stream),
711 content_length: None,
712 };
713 self
714 }
715
716 pub fn body_stream_sized<S>(mut self, stream: S, content_length: u64) -> Self
720 where
721 S: futures_core::Stream<Item = std::result::Result<Bytes, Error>> + Send + 'static,
722 {
723 self.body = RequestBody::Stream {
724 stream: Box::pin(stream),
725 content_length: Some(content_length),
726 };
727 self
728 }
729
730 pub fn query<T: Serialize + ?Sized>(mut self, query: &T) -> Self {
732 if self.error.is_some() {
733 return self;
734 }
735
736 let url = match self.url.as_mut() {
737 Some(url) => url,
738 None => return self,
739 };
740
741 match serde_urlencoded::to_string(query) {
742 Ok(encoded) => {
743 if !encoded.is_empty() {
744 let merged = match url.query() {
745 Some(existing) if !existing.is_empty() => {
746 format!("{}&{}", existing, encoded)
747 }
748 _ => encoded,
749 };
750 if let Err(err) = url.set_query(Some(&merged)) {
751 self.set_error(err.into());
752 }
753 }
754 }
755 Err(err) => self.set_error(err.into()),
756 }
757
758 self
759 }
760
761 pub fn json<T: Serialize + ?Sized>(mut self, json: &T) -> Self {
763 if self.error.is_some() {
764 return self;
765 }
766
767 match serde_json::to_vec(json) {
768 Ok(bytes) => {
769 self.body = RequestBody::Json(bytes);
770 self.ensure_content_type("application/json");
771 }
772 Err(err) => self.set_error(err.into()),
773 }
774
775 self
776 }
777
778 pub fn form<T: Serialize + ?Sized>(mut self, form: &T) -> Self {
780 if self.error.is_some() {
781 return self;
782 }
783
784 match serde_urlencoded::to_string(form) {
785 Ok(encoded) => {
786 self.body = RequestBody::Form(encoded);
787 self.ensure_content_type("application/x-www-form-urlencoded");
788 }
789 Err(err) => self.set_error(err.into()),
790 }
791
792 self
793 }
794
795 pub fn bearer_auth(mut self, token: impl AsRef<str>) -> Self {
797 self.headers
798 .insert("Authorization", format!("Bearer {}", token.as_ref()));
799 self
800 }
801
802 pub fn basic_auth<P: AsRef<str>>(
804 mut self,
805 username: impl AsRef<str>,
806 password: Option<P>,
807 ) -> Self {
808 let creds = match password {
809 Some(p) => format!("{}:{}", username.as_ref(), p.as_ref()),
810 None => format!("{}:", username.as_ref()),
811 };
812 let encoded = base64::engine::general_purpose::STANDARD.encode(creds.as_bytes());
813 self.headers
814 .insert("Authorization", format!("Basic {}", encoded));
815 self
816 }
817
818 pub fn timeout(mut self, timeout: Duration) -> Self {
820 self.timeout = Some(timeout);
821 self
822 }
823
824 pub fn version(mut self, version: HttpVersion) -> Self {
826 self.version = Some(version);
827 self
828 }
829
830 pub fn build(self) -> Result<Request> {
832 if let Some(error) = self.error {
833 return Err(error);
834 }
835
836 let url = self.url.ok_or_else(|| Error::missing("url"))?;
837
838 Ok(Request {
839 method: self.method,
840 url,
841 headers: self.headers,
842 body: self.body,
843 version: self.version,
844 timeout: self.timeout,
845 })
846 }
847
848 pub async fn send(self) -> Result<Response> {
850 let client = self.client.clone();
851 let request = self.build()?;
852 if request.body.is_streaming() {
853 return Err(Error::HttpProtocol(
854 "streaming request bodies require send_streaming()".into(),
855 ));
856 }
857 client.execute(request).await
858 }
859
860 pub async fn send_streaming(self) -> Result<Response> {
864 let policy = self.client.redirect_policy.clone();
865 if matches!(policy, RedirectPolicy::None) {
866 return self.send_streaming_once().await;
867 }
868
869 if self.body.is_streaming() {
870 let mut response = self.send_streaming_once().await?;
871 if response.is_redirect() {
872 drain_streaming_body(response.body_mut()).await?;
873 return Err(Error::HttpProtocol(
874 "redirect would require replaying a non-replayable streaming request body"
875 .into(),
876 ));
877 }
878 return Ok(response);
879 }
880
881 let client = self.client;
882 let mut request = self.build()?;
883 let mut redirects = 0u32;
884
885 loop {
886 let builder = RequestBuilder {
887 client,
888 url: Some(request.url.clone()),
889 method: request.method.clone(),
890 headers: request.headers.clone(),
891 body: request.body.clone(),
892 version: request.version,
893 timeout: request.timeout,
894 error: None,
895 };
896
897 let mut response = builder.send_streaming_once().await?;
898
899 if !response.is_redirect() {
900 return Ok(response);
901 }
902
903 let location = match response.redirect_url() {
904 Some(value) => value.to_string(),
905 None => return Ok(response),
906 };
907
908 if let RedirectPolicy::Limited(limit) = policy {
909 if redirects >= limit {
910 return Err(Error::RedirectLimit { count: limit });
911 }
912 }
913
914 drain_streaming_body(response.body_mut()).await?;
915
916 let next_url = request.url.join(&location).map_err(Error::from)?;
917 request = client.redirect_request(&request, &response, next_url)?;
918 redirects += 1;
919 }
920 }
921
922 async fn send_streaming_once(self) -> Result<Response> {
923 let client = self.client.clone();
924 let mut request = self.build()?;
925 let mut timeouts = client.timeouts.clone();
926 if let Some(total) = request.timeout {
927 timeouts.total = Some(total);
928 }
929
930 client
931 .apply_cookie_header_for_url(request.url.as_str().to_string(), &mut request.headers)
932 .await;
933
934 let version = request.version.unwrap_or(client.default_version);
935
936 if matches!(version, HttpVersion::Http3 | HttpVersion::Http3Only) {
937 if let Some(content_length) = request.body.content_length() {
938 if content_length > 0 && !request.headers.contains("content-length") {
939 request
940 .headers
941 .insert("Content-Length", content_length.to_string());
942 }
943 }
944 let body = if request.body.is_streaming() {
945 std::mem::take(&mut request.body)
946 } else {
947 request.body.clone()
948 };
949 let body_timeouts = crate::transport::h3::H3BodyTimeouts {
950 read_idle: timeouts.read_idle,
951 total: timeouts.total,
952 };
953
954 let fut = client.h3_client.send_streaming_with_timeouts(
955 request.url.as_str(),
956 request.method.as_str(),
957 &request.headers,
958 body,
959 body_timeouts,
960 );
961
962 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
963 tokio_timeout(ttfb_timeout, fut)
964 .await
965 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
966 } else if let Some(total_timeout) = timeouts.total {
967 tokio_timeout(total_timeout, fut)
968 .await
969 .map_err(|_| Error::TotalTimeout(total_timeout))??
970 } else {
971 fut.await?
972 };
973
974 let request_url = request.url.clone();
975 let response = response.with_url(request_url.clone());
976
977 let response_headers = response.headers().clone();
978 client
979 .store_cookies_from_headers(response_headers, request_url.as_str().to_string())
980 .await;
981
982 if let Some(enc) = response.content_encoding() {
983 let enc_lc = enc.to_lowercase();
984 if enc_lc.contains("gzip")
985 || enc_lc.contains("deflate")
986 || enc_lc.contains("br")
987 || enc_lc.contains("zstd")
988 {
989 return Err(Error::Decompression(
990 "Compressed streaming is unsupported".into(),
991 ));
992 }
993 }
994
995 return Ok(response);
996 }
997
998 let uri: Uri = request
1000 .url
1001 .as_str()
1002 .parse()
1003 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
1004
1005 let request_url = request.url.clone();
1006 let prefer_http2 = match version {
1007 HttpVersion::Http1_1 => false,
1008 HttpVersion::Http2 => true,
1009 HttpVersion::Auto => matches!(client.default_version, HttpVersion::Http2),
1010 HttpVersion::Http3 | HttpVersion::Http3Only => unreachable!(),
1011 };
1012 let pool_key = client.make_pool_key(&uri);
1013
1014 let response = if !prefer_http2 {
1015 let h1_slot = client
1016 .acquire_h1_connection_slot(&pool_key, &timeouts)
1017 .await?;
1018 let pooled_h1_stream = client.h1_pool.get_h1(&pool_key).await;
1019 if pooled_h1_stream.is_some() {
1020 client.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1021 }
1022 let connector = client.connector_for_uri(&uri);
1023 let method = request.method.clone();
1024 let headers = request.headers.clone();
1025 let body = request.body;
1026 let use_early_data = client.http_tls_early_data
1027 && uri.scheme_str() == Some("https")
1028 && is_zero_rtt_safe_request(method.as_str(), &body);
1029
1030 let (stream, early_outcome) = if let Some(stream) = pooled_h1_stream {
1031 (stream, EarlyDataOutcome::NotAttempted)
1032 } else {
1033 let connect_result = if use_early_data {
1034 let body_kind = h1_request_body_kind(&body);
1035 let request_head =
1036 H1Connection::build_request_bytes(&method, &uri, &headers, body_kind)?;
1037 let connect_fut = connector.connect_with_alpn_and_early_data(
1038 &uri,
1039 AlpnMode::Http1Only,
1040 Some(&request_head),
1041 );
1042 if let Some(connect_timeout) = timeouts.connect {
1043 tokio_timeout(connect_timeout, connect_fut)
1044 .await
1045 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1046 } else {
1047 connect_fut.await?
1048 }
1049 } else {
1050 let connect_fut = connector.connect_h1_only(&uri);
1051 let stream = if let Some(connect_timeout) = timeouts.connect {
1052 tokio_timeout(connect_timeout, connect_fut)
1053 .await
1054 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1055 } else {
1056 connect_fut.await?
1057 };
1058 (stream, EarlyDataOutcome::NotAttempted)
1059 };
1060 connect_result
1061 };
1062
1063 let request_head_sent = matches!(
1064 early_outcome,
1065 EarlyDataOutcome::Accepted | EarlyDataOutcome::Rejected { .. }
1066 );
1067
1068 let h1_pool = client.h1_pool.clone();
1069 let pool_key_for_reuse = pool_key.clone();
1070 let on_reusable: crate::transport::h1::H1ReuseHook = Box::new(move |stream| {
1071 let _h1_slot = h1_slot;
1072 let _ = h1_pool.try_put_h1(pool_key_for_reuse, stream);
1073 });
1074 let conn = H1Connection::new(stream);
1075 let send_fut = conn.send_request_streaming(
1076 method,
1077 &uri,
1078 &headers,
1079 body,
1080 H1StreamingOptions {
1081 on_reusable,
1082 read_idle_timeout: timeouts.read_idle,
1083 total_timeout: timeouts.total,
1084 request_head_sent,
1085 },
1086 );
1087 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
1088 tokio_timeout(ttfb_timeout, send_fut)
1089 .await
1090 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
1091 } else {
1092 send_fut.await?
1093 };
1094
1095 let response_headers = response.headers().clone();
1096 client
1097 .store_cookies_from_headers(response_headers, request_url.as_str().to_string())
1098 .await;
1099 let response = response.with_url(request_url);
1100 reject_compressed_streaming(&response)?;
1101 return Ok(response);
1102 } else {
1103 if let Some(content_length) = request.body.content_length() {
1104 if content_length > 0 && !request.headers.contains("content-length") {
1105 request
1106 .headers
1107 .insert("Content-Length", content_length.to_string());
1108 }
1109 }
1110 let body_timeouts = H2BodyTimeouts {
1111 read_idle: timeouts.read_idle,
1112 total: timeouts.total,
1113 };
1114 let pooled = {
1116 let mut pool = client.h2_pool.write().await;
1117 if let Some(conn) = pool.get(&pool_key) {
1118 if conn.is_alive() {
1119 Some(conn.clone())
1120 } else {
1121 pool.remove(&pool_key);
1122 None
1123 }
1124 } else {
1125 None
1126 }
1127 };
1128
1129 if let Some(conn) = pooled {
1130 client.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1131 let streaming_body = request.body.is_streaming();
1132 let body = if streaming_body {
1133 std::mem::take(&mut request.body)
1134 } else {
1135 request.body.clone()
1136 };
1137
1138 let send_fut = conn.send_streaming_request(
1139 request.method.clone(),
1140 &uri,
1141 &request.headers,
1142 body,
1143 body_timeouts,
1144 );
1145 let res = if let Some(ttfb_timeout) = timeouts.ttfb {
1146 tokio_timeout(ttfb_timeout, send_fut)
1147 .await
1148 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1149 } else {
1150 send_fut.await
1151 };
1152
1153 match res {
1154 Ok(response) => {
1155 let response = response.with_url(request_url.clone());
1156 let response_headers = response.headers().clone();
1157 client
1158 .store_cookies_from_headers(
1159 response_headers,
1160 request_url.as_str().to_string(),
1161 )
1162 .await;
1163 response
1164 }
1165 Err(e) => {
1166 if streaming_body {
1167 return Err(e);
1168 }
1169 tracing::debug!(
1170 "Pooled HTTP/2 connection failed for streaming, creating new: {}",
1171 e
1172 );
1173 let mut pool = client.h2_pool.write().await;
1174 pool.remove(&pool_key);
1175 drop(pool);
1176
1177 let connector = client.connector_for_uri(&uri);
1178 let connect_fut = connector.connect(&uri);
1179 let stream = if let Some(connect_timeout) = timeouts.connect {
1180 tokio_timeout(connect_timeout, connect_fut)
1181 .await
1182 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1183 } else {
1184 connect_fut.await?
1185 };
1186
1187 let alpn = stream.alpn_protocol();
1188 if !alpn.is_h2() {
1189 return Err(Error::HttpProtocol(format!(
1190 "Expected h2 ALPN, got {:?}",
1191 alpn
1192 )));
1193 }
1194
1195 let h2_connect_fut = H2Connection::connect(
1196 stream,
1197 client.http2_settings.clone(),
1198 client.pseudo_order,
1199 );
1200 let h2_conn = if let Some(connect_timeout) = timeouts.connect {
1201 tokio_timeout(connect_timeout, h2_connect_fut)
1202 .await
1203 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1204 } else {
1205 h2_connect_fut.await?
1206 };
1207
1208 let pooled_conn = H2PooledConnection::new_with_config(
1209 h2_conn,
1210 client.h2_transport_config.clone(),
1211 );
1212 {
1213 let mut pool = client.h2_pool.write().await;
1214 pool.insert(pool_key.clone(), pooled_conn.clone());
1215 }
1216
1217 let send_fut = pooled_conn.send_streaming_request(
1218 request.method.clone(),
1219 &uri,
1220 &request.headers,
1221 request.body.clone(),
1222 body_timeouts,
1223 );
1224 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
1225 tokio_timeout(ttfb_timeout, send_fut)
1226 .await
1227 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
1228 } else {
1229 send_fut.await?
1230 };
1231
1232 let response = response.with_url(request_url.clone());
1233 let response_headers = response.headers().clone();
1234 client
1235 .store_cookies_from_headers(
1236 response_headers,
1237 request_url.as_str().to_string(),
1238 )
1239 .await;
1240 response
1241 }
1242 }
1243 } else if client.h2_direct_streaming_responses && request.body.is_empty() {
1244 let response = client
1245 .send_h2_direct_streaming_response(
1246 request.method.clone(),
1247 &uri,
1248 request.headers.clone(),
1249 &pool_key,
1250 &timeouts,
1251 body_timeouts,
1252 )
1253 .await?;
1254
1255 let response = response.with_url(request_url.clone());
1256 let response_headers = response.headers().clone();
1257 client
1258 .store_cookies_from_headers(response_headers, request_url.as_str().to_string())
1259 .await;
1260 response
1261 } else {
1262 let connector = client.connector_for_uri(&uri);
1263 let connect_fut = connector.connect(&uri);
1264 let stream = if let Some(connect_timeout) = timeouts.connect {
1265 tokio_timeout(connect_timeout, connect_fut)
1266 .await
1267 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1268 } else {
1269 connect_fut.await?
1270 };
1271
1272 let alpn = stream.alpn_protocol();
1273 if !alpn.is_h2() {
1274 return Err(Error::HttpProtocol(format!(
1275 "Expected h2 ALPN, got {:?}",
1276 alpn
1277 )));
1278 }
1279
1280 let h2_connect_fut = H2Connection::connect(
1281 stream,
1282 client.http2_settings.clone(),
1283 client.pseudo_order,
1284 );
1285 let h2_conn = if let Some(connect_timeout) = timeouts.connect {
1286 tokio_timeout(connect_timeout, h2_connect_fut)
1287 .await
1288 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1289 } else {
1290 h2_connect_fut.await?
1291 };
1292
1293 let pooled_conn = H2PooledConnection::new_with_config(
1294 h2_conn,
1295 client.h2_transport_config.clone(),
1296 );
1297 {
1298 let mut pool = client.h2_pool.write().await;
1299 pool.insert(pool_key.clone(), pooled_conn.clone());
1300 }
1301
1302 let body = std::mem::take(&mut request.body);
1303
1304 let send_fut = pooled_conn.send_streaming_request(
1305 request.method.clone(),
1306 &uri,
1307 &request.headers,
1308 body,
1309 body_timeouts,
1310 );
1311 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
1312 tokio_timeout(ttfb_timeout, send_fut)
1313 .await
1314 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
1315 } else {
1316 send_fut.await?
1317 };
1318
1319 let response = response.with_url(request_url.clone());
1320 let response_headers = response.headers().clone();
1321 client
1322 .store_cookies_from_headers(response_headers, request_url.as_str().to_string())
1323 .await;
1324 response
1325 }
1326 };
1327
1328 reject_compressed_streaming(&response)?;
1329 Ok(response)
1330 }
1331}
1332
1333fn reject_compressed_streaming(response: &Response) -> Result<()> {
1334 if let Some(enc) = response.content_encoding() {
1335 let enc_lc = enc.to_lowercase();
1336 if enc_lc.contains("gzip")
1337 || enc_lc.contains("deflate")
1338 || enc_lc.contains("br")
1339 || enc_lc.contains("zstd")
1340 {
1341 return Err(Error::Decompression(
1342 "Compressed streaming is unsupported".into(),
1343 ));
1344 }
1345 }
1346 Ok(())
1347}
1348
1349async fn drain_streaming_body(body: &mut Body) -> Result<()> {
1350 while let Some(frame) = body.frame().await {
1351 let _ = frame?;
1352 }
1353 Ok(())
1354}
1355
1356impl Client {
1357 pub async fn execute(&self, mut request: Request) -> Result<Response> {
1359 if request.body.is_streaming() {
1360 return Err(Error::HttpProtocol(
1361 "streaming request bodies require send_streaming()".into(),
1362 ));
1363 }
1364
1365 let policy = self.redirect_policy.clone();
1366 let mut redirects = 0u32;
1367
1368 loop {
1369 let mut headers = request.headers.clone();
1370 let cookie_injected = self
1371 .apply_cookie_header_for_url(request.url.as_str().to_string(), &mut headers)
1372 .await;
1373 request.headers = headers;
1374
1375 let mut timeouts = self.timeouts.clone();
1376 if let Some(total) = request.timeout {
1377 timeouts.total = Some(total);
1378 }
1379
1380 let response = self
1381 .execute_once(request.clone(), &timeouts)
1382 .await?
1383 .into_buffered()
1384 .await?;
1385
1386 let response_headers = response.headers().clone();
1387 self.store_cookies_from_headers(response_headers, request.url.as_str().to_string())
1388 .await;
1389
1390 if matches!(policy, RedirectPolicy::None) || !response.is_redirect() {
1391 return Ok(response);
1392 }
1393
1394 let location = match response.redirect_url() {
1395 Some(value) => value.to_string(),
1396 None => return Ok(response),
1397 };
1398
1399 if let RedirectPolicy::Limited(limit) = policy {
1400 if redirects >= limit {
1401 return Err(Error::RedirectLimit { count: limit });
1402 }
1403 }
1404
1405 let next_url = request.url.join(&location).map_err(Error::from)?;
1406 let mut next_request = self.redirect_request(&request, &response, next_url)?;
1407
1408 if cookie_injected {
1409 next_request.headers.remove("cookie");
1410 }
1411
1412 request = next_request;
1413 redirects += 1;
1414 }
1415 }
1416
1417 async fn execute_once(&self, request: Request, timeouts: &Timeouts) -> Result<Response> {
1418 let version = request.version.unwrap_or(self.default_version);
1419
1420 if matches!(version, HttpVersion::Http3Only) {
1422 return self
1423 .send_h3_for_url(request.clone(), request.url.clone(), timeouts)
1424 .await;
1425 }
1426
1427 if matches!(version, HttpVersion::Http3) {
1429 match self
1430 .send_h3_for_url(request.clone(), request.url.clone(), timeouts)
1431 .await
1432 {
1433 Ok(response) => return Ok(response),
1434 Err(e) => {
1435 tracing::debug!("HTTP/3 failed, falling back to HTTP/1.1 or HTTP/2: {}", e);
1436 }
1438 }
1439 }
1440
1441 if matches!(version, HttpVersion::Auto) && self.h3_upgrade_enabled {
1443 let origin = Self::origin_for_url(&request.url);
1444 if let Some(alt_svc) = self.alt_svc_cache.get_h3_alternative(&origin).await {
1445 tracing::debug!(
1446 "Alt-Svc indicates HTTP/3 support for {}, attempting upgrade",
1447 origin
1448 );
1449
1450 let mut h3_url = request.url.clone();
1451 let _ = h3_url.set_scheme("https");
1452 if let Some(ref host) = alt_svc.host {
1453 h3_url
1454 .set_host(Some(host))
1455 .map_err(|_| Error::HttpProtocol("Invalid Alt-Svc host".into()))?;
1456 }
1457 let _ = h3_url.set_port(Some(alt_svc.port));
1458
1459 match self
1460 .send_h3_for_url(request.clone(), h3_url.clone(), timeouts)
1461 .await
1462 {
1463 Ok(response) => return Ok(response.with_url(h3_url)),
1464 Err(e) => {
1465 tracing::debug!("HTTP/3 upgrade failed, using HTTP/1.1 or HTTP/2: {}", e);
1466 }
1468 }
1469 }
1470 }
1471
1472 self.send_h1_h2(request, version, timeouts).await
1474 }
1475
1476 async fn send_h3_for_url(
1477 &self,
1478 request: Request,
1479 url: Url,
1480 timeouts: &Timeouts,
1481 ) -> Result<Response> {
1482 let body = if request.body.is_empty() {
1483 None
1484 } else {
1485 Some(request.body.clone().into_bytes()?.to_vec())
1486 };
1487
1488 let fut = self.h3_client.send_request(
1489 url.as_str(),
1490 request.method.as_str(),
1491 &request.headers,
1492 body,
1493 );
1494
1495 let response = if let Some(total_timeout) = timeouts.total {
1497 tokio_timeout(total_timeout, fut)
1498 .await
1499 .map_err(|_| Error::TotalTimeout(total_timeout))??
1500 } else {
1501 fut.await?
1502 };
1503
1504 Ok(response.with_url(url))
1505 }
1506
1507 async fn send_h1_h2(
1508 &self,
1509 request: Request,
1510 version: HttpVersion,
1511 timeouts: &Timeouts,
1512 ) -> Result<Response> {
1513 let request_url = request.url.clone();
1515
1516 let uri: Uri = request
1518 .url
1519 .as_str()
1520 .parse()
1521 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
1522
1523 let prefer_http2 = match version {
1525 HttpVersion::Http1_1 => false,
1526 HttpVersion::Http2 => true,
1527 HttpVersion::Http3 | HttpVersion::Http3Only => {
1528 return Err(Error::HttpProtocol("HTTP/3 should use send_h3".into()));
1529 }
1530 HttpVersion::Auto => matches!(self.default_version, HttpVersion::Http2),
1531 };
1532
1533 let h3_upgrade_enabled = self.h3_upgrade_enabled;
1535 let alt_svc_cache = self.alt_svc_cache.clone();
1536 let origin = Self::origin_for_url(&request.url);
1537
1538 let headers_vec = request.headers.clone();
1539 let body_bytes = if request.body.is_empty() {
1540 None
1541 } else {
1542 Some(request.body.clone().into_bytes()?)
1543 };
1544
1545 if prefer_http2 {
1547 let pool_key = self.make_pool_key(&uri);
1548
1549 let pooled = {
1551 let mut pool = self.h2_pool.write().await;
1552 if let Some(conn) = pool.get(&pool_key) {
1553 if conn.is_alive() {
1554 Some(conn.clone())
1555 } else {
1556 pool.remove(&pool_key);
1557 None
1558 }
1559 } else {
1560 None
1561 }
1562 };
1563
1564 if let Some(conn) = pooled {
1565 self.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1566 let result = conn
1568 .send_request(
1569 request.method.clone(),
1570 &uri,
1571 &headers_vec,
1572 body_bytes.clone(),
1573 )
1574 .await;
1575
1576 match result {
1577 Ok(response) => {
1578 if h3_upgrade_enabled {
1580 if let Some(alt_svc) = response.get_header("alt-svc") {
1581 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
1582 }
1583 }
1584 return Ok(response.with_url(request_url));
1585 }
1586 Err(e) => {
1587 tracing::debug!("Pooled HTTP/2 connection failed, creating new: {}", e);
1589 let mut pool = self.h2_pool.write().await;
1590 pool.remove(&pool_key);
1591 }
1592 }
1593 }
1594
1595 let connector = self.connector_for_uri(&uri);
1598 let connect_fut = connector.connect(&uri);
1599 let stream = if let Some(connect_timeout) = timeouts.connect {
1600 tokio_timeout(connect_timeout, connect_fut)
1601 .await
1602 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1603 } else {
1604 connect_fut.await?
1605 };
1606
1607 let use_http2 = if self.http2_prior_knowledge && !stream.alpn_protocol().is_h2() {
1609 true
1611 } else if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
1612 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
1613 } else {
1614 false
1615 };
1616
1617 if use_http2 {
1618 let h2_conn =
1620 H2Connection::connect(stream, self.http2_settings.clone(), self.pseudo_order)
1621 .await?;
1622 let pooled_conn =
1623 H2PooledConnection::new_with_config(h2_conn, self.h2_transport_config.clone());
1624
1625 {
1627 let mut pool = self.h2_pool.write().await;
1628 pool.insert(pool_key, pooled_conn.clone());
1629 }
1630
1631 let fut = pooled_conn.send_request(
1633 request.method.clone(),
1634 &uri,
1635 &headers_vec,
1636 body_bytes.clone(),
1637 );
1638
1639 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
1640 tokio_timeout(ttfb_timeout, fut)
1641 .await
1642 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1643 } else {
1644 fut.await
1645 }?;
1646
1647 if h3_upgrade_enabled {
1649 if let Some(alt_svc) = response.get_header("alt-svc") {
1650 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
1651 }
1652 }
1653
1654 return Ok(response.with_url(request_url));
1655 }
1656 }
1658
1659 let pool_key = self.make_pool_key(&uri);
1661 let h1_slot = self.acquire_h1_connection_slot(&pool_key, timeouts).await?;
1662
1663 let mut stream_opt = self.h1_pool.get_h1(&pool_key).await;
1665 let mut used_pooled = stream_opt.is_some();
1666 if used_pooled {
1667 self.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
1668 }
1669
1670 let mut stream = if let Some(pooled_stream) = stream_opt.take() {
1672 tracing::debug!("H1: Reusing pooled connection for {:?}", pool_key);
1673 pooled_stream
1674 } else {
1675 tracing::debug!("H1: Creating new connection for {:?}", pool_key);
1676 let connector = self.connector_for_uri(&uri);
1678 let connect_fut = connector.connect(&uri);
1679 if let Some(connect_timeout) = timeouts.connect {
1680 tokio_timeout(connect_timeout, connect_fut)
1681 .await
1682 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1683 } else {
1684 connect_fut.await?
1685 }
1686 };
1687
1688 let server_wants_h2 = if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
1691 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
1692 } else {
1693 false
1694 };
1695
1696 let response = if server_wants_h2 {
1697 drop(h1_slot);
1698 tracing::debug!("Server selected h2 via ALPN, upgrading to HTTP/2");
1700
1701 let h2_conn =
1702 H2Connection::connect(stream, self.http2_settings.clone(), self.pseudo_order)
1703 .await?;
1704 let pooled_conn =
1705 H2PooledConnection::new_with_config(h2_conn, self.h2_transport_config.clone());
1706
1707 {
1709 let mut pool = self.h2_pool.write().await;
1710 pool.insert(pool_key, pooled_conn.clone());
1711 }
1712
1713 let fut = pooled_conn.send_request(
1715 request.method.clone(),
1716 &uri,
1717 &headers_vec,
1718 body_bytes.clone(),
1719 );
1720
1721 if let Some(ttfb_timeout) = timeouts.ttfb {
1722 tokio_timeout(ttfb_timeout, fut)
1723 .await
1724 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1725 } else {
1726 fut.await
1727 }?
1728 } else {
1729 let _h1_slot = h1_slot;
1730 let result = loop {
1734 let stream_for_request = stream;
1735 let body_bytes = body_bytes.clone();
1736 let fut = Self::do_send_http1(
1737 stream_for_request,
1738 request.method.clone(),
1739 &uri,
1740 request.headers.clone(),
1741 body_bytes.clone(),
1742 );
1743
1744 let request_result = if let Some(ttfb_timeout) = timeouts.ttfb {
1746 tokio_timeout(ttfb_timeout, fut)
1747 .await
1748 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1749 } else {
1750 fut.await
1751 };
1752
1753 match request_result {
1754 Ok((resp, returned_stream)) => {
1755 self.h1_pool.put_h1(pool_key.clone(), returned_stream).await;
1757 break Ok(resp);
1758 }
1759 Err(e) => {
1760 if used_pooled {
1762 tracing::debug!(
1763 "H1: Pooled connection failed for {:?}, creating new: {}",
1764 pool_key,
1765 e
1766 );
1767 let connector = self.connector_for_uri(&uri);
1769 let connect_fut = connector.connect(&uri);
1770 stream = if let Some(connect_timeout) = timeouts.connect {
1771 tokio_timeout(connect_timeout, connect_fut)
1772 .await
1773 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1774 } else {
1775 connect_fut.await?
1776 };
1777 used_pooled = false; continue;
1779 } else {
1780 tracing::debug!(
1782 "H1: Request failed for {:?}, discarding connection: {}",
1783 pool_key,
1784 e
1785 );
1786 break Err(e);
1787 }
1788 }
1789 }
1790 };
1791
1792 result?
1793 };
1794
1795 if h3_upgrade_enabled {
1797 if let Some(alt_svc) = response.get_header("alt-svc") {
1798 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
1799 }
1800 }
1801
1802 Ok(response.with_url(request_url))
1803 }
1804
1805 fn redirect_request(
1806 &self,
1807 request: &Request,
1808 response: &Response,
1809 next_url: Url,
1810 ) -> Result<Request> {
1811 let status = response.status().as_u16();
1812 let mut method = request.method.clone();
1813 let mut headers = request.headers.clone();
1814
1815 let should_switch = status == 303
1816 || ((status == 301 || status == 302) && !matches!(method, Method::GET | Method::HEAD));
1817
1818 let body = if should_switch {
1819 method = Method::GET;
1820 headers.remove("content-length");
1821 headers.remove("content-type");
1822 RequestBody::Empty
1823 } else if request.body.is_streaming() {
1824 return Err(Error::HttpProtocol(
1825 "redirect would require replaying a non-replayable streaming request body".into(),
1826 ));
1827 } else {
1828 request.body.clone()
1829 };
1830
1831 if Self::is_cross_origin(&request.url, &next_url) {
1832 headers.remove("authorization");
1833 }
1834
1835 Ok(Request {
1836 method,
1837 url: next_url,
1838 headers,
1839 body,
1840 version: request.version,
1841 timeout: request.timeout,
1842 })
1843 }
1844
1845 async fn apply_cookie_header_for_url(
1846 &self,
1847 request_url: String,
1848 headers: &mut Headers,
1849 ) -> bool {
1850 let Some(jar) = &self.cookie_store else {
1851 return false;
1852 };
1853 if headers.contains("cookie") {
1854 return false;
1855 }
1856
1857 let cookie_header = jar.read().await.build_cookie_header(&request_url);
1858 if let Some(cookie_header) = cookie_header {
1859 headers.insert("Cookie", cookie_header);
1860 return true;
1861 }
1862 false
1863 }
1864
1865 async fn store_cookies_from_headers(&self, headers: Headers, request_url: String) {
1866 if let Some(jar) = &self.cookie_store {
1867 jar.write().await.store_from_headers(&headers, &request_url);
1868 }
1869 }
1870
1871 fn make_pool_key(&self, uri: &Uri) -> PoolKey {
1873 let host = uri.host().unwrap_or("localhost").to_string();
1874 let is_https = uri.scheme_str() == Some("https");
1875 let port = uri.port_u16().unwrap_or(if is_https { 443 } else { 80 });
1876 PoolKey::new(host, port, is_https, self.fingerprint, self.pseudo_order)
1877 }
1878
1879 fn take_h2_direct_connection(
1880 &self,
1881 pool_key: &PoolKey,
1882 ) -> Option<RawH2Connection<MaybeHttpsStream>> {
1883 let mut pool = self
1884 .h2_direct_pool
1885 .lock()
1886 .expect("H2 direct pool mutex poisoned");
1887 let conn = pool.get_mut(pool_key).and_then(Vec::pop);
1888 if pool.get(pool_key).is_some_and(Vec::is_empty) {
1889 pool.remove(pool_key);
1890 }
1891 conn
1892 }
1893
1894 fn h2_direct_reuse_hook(&self, pool_key: PoolKey) -> H2DirectReuseHook {
1895 let pool = self.h2_direct_pool.clone();
1896 Box::new(move |conn| {
1897 if !conn.is_reusable() {
1898 return;
1899 }
1900 let mut guard = pool.lock().expect("H2 direct pool mutex poisoned");
1901 let entry = guard.entry(pool_key).or_default();
1902 if entry.is_empty() {
1903 entry.push(conn);
1904 }
1905 })
1906 }
1907
1908 async fn connect_h2_direct_connection(
1909 &self,
1910 uri: &Uri,
1911 timeouts: &Timeouts,
1912 ) -> Result<RawH2Connection<MaybeHttpsStream>> {
1913 let connector = self.connector_for_uri(uri);
1914 let connect_fut = connector.connect(uri);
1915 let stream = if let Some(connect_timeout) = timeouts.connect {
1916 tokio_timeout(connect_timeout, connect_fut)
1917 .await
1918 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1919 } else {
1920 connect_fut.await?
1921 };
1922
1923 let use_http2 = if self.http2_prior_knowledge && !stream.alpn_protocol().is_h2() {
1924 true
1925 } else if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
1926 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
1927 } else {
1928 false
1929 };
1930
1931 if !use_http2 {
1932 return Err(Error::HttpProtocol(format!(
1933 "Expected h2 ALPN, got {:?}",
1934 stream.alpn_protocol()
1935 )));
1936 }
1937
1938 let h2_connect_fut =
1939 RawH2Connection::connect(stream, self.http2_settings.clone(), self.pseudo_order);
1940 if let Some(connect_timeout) = timeouts.connect {
1941 tokio_timeout(connect_timeout, h2_connect_fut)
1942 .await
1943 .map_err(|_| Error::ConnectTimeout(connect_timeout))?
1944 } else {
1945 h2_connect_fut.await
1946 }
1947 }
1948
1949 async fn start_h2_direct_response(&self, request: H2DirectResponseRequest) -> Result<Response> {
1950 let H2DirectResponseRequest {
1951 conn,
1952 method,
1953 uri,
1954 headers,
1955 body_timeouts,
1956 pool_key,
1957 ttfb_timeout,
1958 } = request;
1959 let fut = async move {
1960 let mut conn = conn;
1961 let stream_id = conn.send_headers_raw(&method, &uri, &headers, true).await?;
1962 let (status, headers, end_stream) = conn
1963 .read_response_headers_with_end_stream(stream_id)
1964 .await?;
1965 Ok::<_, Error>(H2DirectStart {
1966 conn,
1967 stream_id,
1968 status: status.as_u16(),
1969 headers,
1970 end_stream,
1971 })
1972 };
1973
1974 let mut started = if let Some(timeout) = ttfb_timeout {
1975 tokio_timeout(timeout, fut)
1976 .await
1977 .map_err(|_| Error::TtfbTimeout(timeout))??
1978 } else {
1979 fut.await?
1980 };
1981
1982 if started.end_stream {
1983 started.conn.remove_stream(started.stream_id);
1984 let on_reusable = self.h2_direct_reuse_hook(pool_key);
1985 on_reusable(started.conn);
1986 return Ok(Response::with_body(
1987 started.status,
1988 Headers::from(started.headers),
1989 Body::empty(),
1990 "HTTP/2".to_string(),
1991 ));
1992 }
1993
1994 let on_reusable = self.h2_direct_reuse_hook(pool_key);
1995 Ok(Response::with_body(
1996 started.status,
1997 Headers::from(started.headers),
1998 Body::from_h2_direct(H2DirectBody::new(
1999 started.conn,
2000 started.stream_id,
2001 body_timeouts,
2002 on_reusable,
2003 )),
2004 "HTTP/2".to_string(),
2005 ))
2006 }
2007
2008 async fn send_h2_direct_streaming_response(
2009 &self,
2010 method: Method,
2011 uri: &Uri,
2012 headers: Headers,
2013 pool_key: &PoolKey,
2014 timeouts: &Timeouts,
2015 body_timeouts: H2BodyTimeouts,
2016 ) -> Result<Response> {
2017 if let Some(conn) = self.take_h2_direct_connection(pool_key) {
2018 self.pool_reuse_counter.fetch_add(1, Ordering::Relaxed);
2019 match self
2020 .start_h2_direct_response(H2DirectResponseRequest {
2021 conn,
2022 method: method.clone(),
2023 uri: uri.clone(),
2024 headers: headers.clone(),
2025 body_timeouts,
2026 pool_key: pool_key.clone(),
2027 ttfb_timeout: timeouts.ttfb,
2028 })
2029 .await
2030 {
2031 Ok(response) => return Ok(response),
2032 Err(error) => {
2033 tracing::debug!(
2034 "Pooled direct HTTP/2 streaming connection failed, reconnecting: {}",
2035 error
2036 );
2037 }
2038 }
2039 }
2040
2041 let conn = self.connect_h2_direct_connection(uri, timeouts).await?;
2042 self.start_h2_direct_response(H2DirectResponseRequest {
2043 conn,
2044 method,
2045 uri: uri.clone(),
2046 headers,
2047 body_timeouts,
2048 pool_key: pool_key.clone(),
2049 ttfb_timeout: timeouts.ttfb,
2050 })
2051 .await
2052 }
2053
2054 async fn do_send_http1(
2055 stream: MaybeHttpsStream,
2056 method: Method,
2057 uri: &Uri,
2058 headers: Headers,
2059 body: Option<Bytes>,
2060 ) -> Result<(Response, MaybeHttpsStream)> {
2061 let mut conn = H1Connection::new(stream);
2062 let response = conn.send_request(method, uri, &headers, body).await?;
2063 let stream = conn.into_inner();
2064 Ok((response, stream))
2065 }
2066
2067 fn origin_for_url(url: &Url) -> String {
2069 let scheme = url.scheme();
2070 let host = url.host_str().unwrap_or("localhost");
2071 let port = url
2072 .port_or_known_default()
2073 .unwrap_or(if scheme == "https" { 443 } else { 80 });
2074
2075 if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
2076 format!("{}://{}", scheme, host)
2077 } else {
2078 format!("{}://{}:{}", scheme, host, port)
2079 }
2080 }
2081
2082 fn is_cross_origin(a: &Url, b: &Url) -> bool {
2083 a.scheme() != b.scheme()
2084 || a.host_str() != b.host_str()
2085 || a.port_or_known_default() != b.port_or_known_default()
2086 }
2087}
2088
2089impl ClientBuilder {
2090 pub fn new() -> Self {
2099 Self {
2100 fingerprint: FingerprintProfile::default(),
2101 http2_settings: None,
2102 pseudo_order: None,
2103 timeouts: Timeouts::default(),
2104 dns_config: DnsConfig::new(),
2105 pool_idle_timeout: Duration::from_secs(30),
2106 pool_max_idle_per_host: 6,
2107 h1_max_connections_per_origin: 6,
2108 h3_max_idle_timeout: None,
2109 h3_fingerprint: None,
2110 h3_backend: H3Backend::Native,
2111 h3_transport_config: H3TransportConfig::default(),
2112 h2_transport_config: H2TransportConfig::default(),
2113 h2_direct_streaming_responses: false,
2114 tcp_keepalive: None,
2115 tcp_keepalive_interval: None,
2116 tcp_keepalive_retries: None,
2117 tcp_fingerprint: None,
2118 prefer_http2: true, h3_upgrade_enabled: true, http2_prior_knowledge: false,
2121 root_certs: Vec::new(),
2122 use_platform_roots: false,
2123 danger_accept_invalid_certs: false,
2124 localhost_allows_invalid_certs: true, default_headers: Headers::new(),
2126 redirect_policy: RedirectPolicy::None,
2127 cookie_store: None,
2128 http_tls_early_data: false,
2129 }
2130 }
2131
2132 pub fn fingerprint(mut self, fingerprint: FingerprintProfile) -> Self {
2134 self.fingerprint = fingerprint;
2135 self
2136 }
2137
2138 pub fn http2_settings(mut self, settings: Http2Settings) -> Self {
2140 self.http2_settings = Some(settings);
2141 self
2142 }
2143
2144 pub fn pseudo_order(mut self, order: PseudoHeaderOrder) -> Self {
2146 self.pseudo_order = Some(order);
2147 self
2148 }
2149
2150 pub fn timeouts(mut self, timeouts: Timeouts) -> Self {
2154 self.timeouts = timeouts;
2155 self
2156 }
2157
2158 pub fn api_timeouts(mut self) -> Self {
2162 self.timeouts = Timeouts::api_defaults();
2163 self
2164 }
2165
2166 pub fn streaming_timeouts(mut self) -> Self {
2171 self.timeouts = Timeouts::streaming_defaults();
2172 self
2173 }
2174
2175 #[deprecated(
2180 since = "1.0.2",
2181 note = "Use `timeouts()` or `total_timeout()` instead"
2182 )]
2183 pub fn timeout(mut self, timeout: Duration) -> Self {
2184 self.timeouts.total = Some(timeout);
2185 self
2186 }
2187
2188 pub fn total_timeout(mut self, timeout: Duration) -> Self {
2190 self.timeouts.total = Some(timeout);
2191 self
2192 }
2193
2194 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
2196 self.timeouts.connect = Some(timeout);
2197 self
2198 }
2199
2200 pub fn ttfb_timeout(mut self, timeout: Duration) -> Self {
2202 self.timeouts.ttfb = Some(timeout);
2203 self
2204 }
2205
2206 pub fn read_timeout(mut self, timeout: Duration) -> Self {
2208 self.timeouts.read_idle = Some(timeout);
2209 self
2210 }
2211
2212 pub fn write_timeout(mut self, timeout: Duration) -> Self {
2214 self.timeouts.write_idle = Some(timeout);
2215 self
2216 }
2217
2218 pub fn pool_acquire_timeout(mut self, timeout: Duration) -> Self {
2220 self.timeouts.pool_acquire = Some(timeout);
2221 self
2222 }
2223
2224 pub fn pool_idle_timeout(mut self, timeout: Duration) -> Self {
2226 self.pool_idle_timeout = timeout;
2227 self
2228 }
2229
2230 pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
2232 self.pool_max_idle_per_host = max;
2233 self
2234 }
2235
2236 pub fn h1_max_connections_per_origin(mut self, max: usize) -> Self {
2242 self.h1_max_connections_per_origin = max;
2243 self
2244 }
2245
2246 pub fn h1_max_connections_per_host(self, max: usize) -> Self {
2248 self.h1_max_connections_per_origin(max)
2249 }
2250
2251 pub fn hickory_dns(mut self, enable: bool) -> Self {
2255 self.dns_config = self.dns_config.with_cache_enabled(enable);
2256 self
2257 }
2258
2259 pub fn trust_dns(self, enable: bool) -> Self {
2261 self.hickory_dns(enable)
2262 }
2263
2264 pub fn dns_cache_ttl(mut self, ttl: Duration) -> Self {
2266 self.dns_config = self.dns_config.with_cache_ttl(ttl);
2267 self
2268 }
2269
2270 pub fn resolve(self, domain: &str, addr: SocketAddr) -> Self {
2272 self.resolve_to_addrs(domain, &[addr])
2273 }
2274
2275 pub fn resolve_to_addrs(mut self, domain: &str, addrs: &[SocketAddr]) -> Self {
2277 self.dns_config = self.dns_config.with_override(domain, addrs.to_vec());
2278 self
2279 }
2280
2281 pub fn dns_resolver<R: Resolve + 'static>(mut self, resolver: Arc<R>) -> Self {
2283 self.dns_config = self.dns_config.with_resolver(resolver);
2284 self
2285 }
2286
2287 pub fn dns_resolver2<R: Resolve + 'static>(mut self, resolver: R) -> Self {
2289 self.dns_config = self.dns_config.with_resolver(Arc::new(resolver));
2290 self
2291 }
2292
2293 pub fn tcp_keepalive(mut self, val: Option<Duration>) -> Self {
2295 self.tcp_keepalive = val;
2296 self
2297 }
2298
2299 pub fn tcp_keepalive_interval(mut self, val: Option<Duration>) -> Self {
2301 self.tcp_keepalive_interval = val;
2302 self
2303 }
2304
2305 pub fn tcp_keepalive_retries(mut self, retries: Option<u32>) -> Self {
2307 self.tcp_keepalive_retries = retries;
2308 self
2309 }
2310
2311 pub fn with_tcp_notsent_lowat(mut self, bytes: u32) -> Self {
2316 let mut fp = self.tcp_fingerprint.take().unwrap_or_default();
2317 fp.tcp_notsent_lowat = Some(bytes);
2318 self.tcp_fingerprint = Some(fp);
2319 self
2320 }
2321
2322 pub fn http2_initial_stream_window_size(mut self, size: Option<u32>) -> Self {
2324 if let Some(size) = size {
2325 let mut settings = self
2326 .http2_settings
2327 .unwrap_or_else(|| self.fingerprint.http2_settings());
2328 settings.initial_window_size = size;
2329 self.http2_settings = Some(settings);
2330 }
2331 self
2332 }
2333
2334 pub fn http2_initial_connection_window_size(mut self, size: Option<u32>) -> Self {
2336 if let Some(size) = size {
2337 let mut settings = self
2338 .http2_settings
2339 .unwrap_or_else(|| self.fingerprint.http2_settings());
2340 settings.initial_window_update = size.saturating_sub(65_535);
2341 self.http2_settings = Some(settings);
2342 }
2343 self
2344 }
2345
2346 pub fn http2_adaptive_window(self, _enabled: bool) -> Self {
2349 self
2350 }
2351
2352 pub fn http2_keep_alive_interval(mut self, interval: Option<Duration>) -> Self {
2354 self.h2_transport_config.keep_alive_interval = interval;
2355 self
2356 }
2357
2358 pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
2360 self.h2_transport_config.keep_alive_timeout = timeout;
2361 self
2362 }
2363
2364 pub fn http2_keep_alive_while_idle(mut self, enabled: bool) -> Self {
2366 self.h2_transport_config.keep_alive_while_idle = enabled;
2367 self
2368 }
2369
2370 pub fn h2_max_concurrent_streams_per_connection(mut self, max: u32) -> Self {
2376 self.h2_transport_config
2377 .max_concurrent_streams_per_connection = (max > 0).then_some(max);
2378 self
2379 }
2380
2381 pub fn h2_max_streams_per_origin(self, max: u32) -> Self {
2383 self.h2_max_concurrent_streams_per_connection(max)
2384 }
2385
2386 pub fn h2_streaming_body_buffer_slots(mut self, slots: usize) -> Self {
2388 self.h2_transport_config.streaming_body_buffer_slots = slots.max(1);
2389 self
2390 }
2391
2392 pub fn h2_body_buffer_slots(self, slots: usize) -> Self {
2394 self.h2_streaming_body_buffer_slots(slots)
2395 }
2396
2397 pub fn capacity_policy(mut self, policy: CapacityPolicy) -> Self {
2399 self.h1_max_connections_per_origin = policy.max_pending_per_origin;
2400 self.h2_transport_config
2401 .max_concurrent_streams_per_connection =
2402 Some(policy.max_pending_per_origin.min(u32::MAX as usize) as u32);
2403 self.h2_transport_config.streaming_body_buffer_slots =
2404 policy.streaming_body_buffer_slots.max(1);
2405 self.h3_transport_config.streaming_body_buffer_slots =
2406 policy.streaming_body_buffer_slots.max(1);
2407 self.h3_transport_config.tunnel_outbound_byte_budget = policy
2408 .h3_tunnel_outbound_byte_budget
2409 .max(crate::transport::h3::MIN_H3_TUNNEL_OUTBOUND_BYTE_BUDGET);
2410 self.h3_transport_config.tunnel_inbound_byte_budget = policy
2411 .h3_tunnel_inbound_byte_budget
2412 .max(crate::transport::h3::MIN_H3_TUNNEL_INBOUND_BYTE_BUDGET);
2413 self
2414 }
2415
2416 pub fn h2_direct_streaming_responses(mut self, enabled: bool) -> Self {
2424 self.h2_direct_streaming_responses = enabled;
2425 self
2426 }
2427
2428 pub fn h3_max_idle_timeout(mut self, timeout_ms: u64) -> Self {
2430 self.h3_max_idle_timeout = Some(timeout_ms);
2431 self
2432 }
2433
2434 pub fn h3_fingerprint(mut self, fingerprint: Http3Fingerprint) -> Self {
2436 self.h3_fingerprint = Some(fingerprint);
2437 self
2438 }
2439
2440 fn update_h3_fingerprint(mut self, update: impl FnOnce(&mut Http3Fingerprint)) -> Self {
2441 let mut fingerprint = self
2442 .h3_fingerprint
2443 .take()
2444 .unwrap_or_else(|| self.fingerprint.http3_fingerprint());
2445 update(&mut fingerprint);
2446 self.h3_fingerprint = Some(fingerprint);
2447 self
2448 }
2449
2450 pub fn h3_initial_max_data(self, bytes: u64) -> Self {
2452 self.update_h3_fingerprint(|fingerprint| {
2453 fingerprint.transport.initial_max_data = bytes;
2454 })
2455 }
2456
2457 pub fn h3_initial_max_stream_data_bidi_local(self, bytes: u64) -> Self {
2459 self.update_h3_fingerprint(|fingerprint| {
2460 fingerprint.transport.initial_max_stream_data_bidi_local = bytes;
2461 })
2462 }
2463
2464 pub fn h3_initial_max_stream_data_bidi_remote(self, bytes: u64) -> Self {
2466 self.update_h3_fingerprint(|fingerprint| {
2467 fingerprint.transport.initial_max_stream_data_bidi_remote = bytes;
2468 })
2469 }
2470
2471 pub fn h3_initial_max_stream_data_uni(self, bytes: u64) -> Self {
2473 self.update_h3_fingerprint(|fingerprint| {
2474 fingerprint.transport.initial_max_stream_data_uni = bytes;
2475 })
2476 }
2477
2478 pub fn h3_initial_max_streams_bidi(self, streams: u64) -> Self {
2480 self.update_h3_fingerprint(|fingerprint| {
2481 fingerprint.transport.initial_max_streams_bidi = streams;
2482 })
2483 }
2484
2485 pub fn h3_initial_max_streams_uni(self, streams: u64) -> Self {
2487 self.update_h3_fingerprint(|fingerprint| {
2488 fingerprint.transport.initial_max_streams_uni = streams;
2489 })
2490 }
2491
2492 pub fn h3_max_connection_window(self, bytes: u64) -> Self {
2494 self.update_h3_fingerprint(|fingerprint| {
2495 fingerprint.transport.max_connection_window = bytes;
2496 })
2497 }
2498
2499 pub fn h3_max_stream_window(self, bytes: u64) -> Self {
2501 self.update_h3_fingerprint(|fingerprint| {
2502 fingerprint.transport.max_stream_window = bytes;
2503 })
2504 }
2505
2506 pub fn h3_streaming_body_buffer_slots(mut self, slots: usize) -> Self {
2508 self.h3_transport_config.streaming_body_buffer_slots = slots.max(1);
2509 self
2510 }
2511
2512 pub fn h3_body_buffer_slots(self, slots: usize) -> Self {
2514 self.h3_streaming_body_buffer_slots(slots)
2515 }
2516
2517 pub fn h3_tunnel_outbound_byte_budget(mut self, bytes: usize) -> Self {
2519 self.h3_transport_config.tunnel_outbound_byte_budget =
2520 bytes.max(crate::transport::h3::MIN_H3_TUNNEL_OUTBOUND_BYTE_BUDGET);
2521 self
2522 }
2523
2524 pub fn h3_tunnel_inbound_byte_budget(mut self, bytes: usize) -> Self {
2526 self.h3_transport_config.tunnel_inbound_byte_budget =
2527 bytes.max(crate::transport::h3::MIN_H3_TUNNEL_INBOUND_BYTE_BUDGET);
2528 self
2529 }
2530
2531 pub fn h3_backend(mut self, backend: H3Backend) -> Self {
2533 self.h3_backend = backend;
2534 self
2535 }
2536
2537 pub fn default_headers(mut self, headers: impl Into<Headers>) -> Self {
2539 self.default_headers = headers.into();
2540 self
2541 }
2542
2543 pub fn default_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
2545 self.default_headers.insert(name, value);
2546 self
2547 }
2548
2549 pub fn user_agent(mut self, value: impl Into<String>) -> Self {
2551 self.default_headers.insert("User-Agent", value.into());
2552 self
2553 }
2554
2555 pub fn redirect_policy(mut self, policy: RedirectPolicy) -> Self {
2557 self.redirect_policy = policy;
2558 self
2559 }
2560
2561 pub fn cookie_store(mut self, enabled: bool) -> Self {
2563 if enabled {
2564 self.cookie_store = Some(Arc::new(RwLock::new(CookieJar::new())));
2565 } else {
2566 self.cookie_store = None;
2567 }
2568 self
2569 }
2570
2571 pub fn cookie_jar(mut self, jar: Arc<RwLock<CookieJar>>) -> Self {
2573 self.cookie_store = Some(jar);
2574 self
2575 }
2576
2577 pub fn prefer_http2(mut self, prefer: bool) -> Self {
2579 self.prefer_http2 = prefer;
2580 self
2581 }
2582
2583 pub fn h3_upgrade(mut self, enabled: bool) -> Self {
2590 self.h3_upgrade_enabled = enabled;
2591 self
2592 }
2593
2594 pub fn http2_prior_knowledge(mut self, enabled: bool) -> Self {
2597 self.http2_prior_knowledge = enabled;
2598 if enabled {
2600 self.prefer_http2 = true;
2601 }
2602 self
2603 }
2604
2605 pub fn add_root_certificate(mut self, cert: Vec<u8>) -> Self {
2607 self.root_certs.push(cert);
2608 self
2609 }
2610
2611 pub fn with_platform_roots(mut self, enabled: bool) -> Self {
2622 self.use_platform_roots = enabled;
2623 self
2624 }
2625
2626 pub fn danger_accept_invalid_certs(mut self, accept: bool) -> Self {
2632 self.danger_accept_invalid_certs = accept;
2633 self
2634 }
2635
2636 pub fn localhost_allows_invalid_certs(mut self, allow: bool) -> Self {
2644 self.localhost_allows_invalid_certs = allow;
2645 self
2646 }
2647
2648 pub fn http_tls_early_data(mut self, enabled: bool) -> Self {
2650 self.http_tls_early_data = enabled;
2651 self
2652 }
2653
2654 pub fn build(self) -> Result<Client> {
2656 let session_cache = Arc::new(SessionCache::new());
2657 let tls_fingerprint = self.fingerprint.tls_fingerprint();
2659 let root_certs = self.root_certs.clone();
2660 let mut connector = BoringConnector::with_fingerprint(tls_fingerprint.clone())
2661 .with_shared_session_cache(session_cache.clone())
2662 .with_early_data(self.http_tls_early_data)
2663 .with_root_certificates(self.root_certs.clone())
2664 .with_platform_roots(self.use_platform_roots)
2665 .with_dns_config(self.dns_config.clone())
2666 .tcp_keepalive(self.tcp_keepalive)
2667 .tcp_keepalive_interval(self.tcp_keepalive_interval)
2668 .tcp_keepalive_retries(self.tcp_keepalive_retries);
2669
2670 if let Some(tcp_fp) = &self.tcp_fingerprint {
2671 connector = connector.with_tcp_fingerprint(tcp_fp.clone());
2672 }
2673
2674 if self.danger_accept_invalid_certs {
2676 connector = connector.danger_accept_invalid_certs(true);
2677 }
2678
2679 let mut insecure_connector = BoringConnector::with_fingerprint(tls_fingerprint.clone())
2681 .with_shared_session_cache(session_cache)
2682 .with_early_data(self.http_tls_early_data)
2683 .with_root_certificates(self.root_certs.clone())
2684 .with_platform_roots(self.use_platform_roots)
2685 .with_dns_config(self.dns_config.clone())
2686 .tcp_keepalive(self.tcp_keepalive)
2687 .tcp_keepalive_interval(self.tcp_keepalive_interval)
2688 .tcp_keepalive_retries(self.tcp_keepalive_retries)
2689 .danger_accept_invalid_certs(true);
2690
2691 if let Some(tcp_fp) = &self.tcp_fingerprint {
2692 insecure_connector = insecure_connector.with_tcp_fingerprint(tcp_fp.clone());
2693 }
2694
2695 let h3_fingerprint = self
2697 .h3_fingerprint
2698 .unwrap_or_else(|| self.fingerprint.http3_fingerprint());
2699 let mut h3_client = H3Client::with_fingerprint(tls_fingerprint)
2700 .with_http3_fingerprint(h3_fingerprint)
2701 .with_h3_backend(self.h3_backend)
2702 .with_transport_config(self.h3_transport_config)
2703 .with_root_certificates(root_certs)
2704 .with_platform_roots(self.use_platform_roots)
2705 .with_dns_config(self.dns_config.clone());
2706 if let Some(timeout_ms) = self.h3_max_idle_timeout {
2707 h3_client = h3_client.with_max_idle_timeout(timeout_ms);
2708 }
2709 if self.danger_accept_invalid_certs {
2710 h3_client = h3_client.danger_accept_invalid_certs(true);
2711 }
2712
2713 let http2_settings = self
2715 .http2_settings
2716 .unwrap_or_else(|| self.fingerprint.http2_settings());
2717 let pseudo_order = self
2718 .pseudo_order
2719 .unwrap_or_else(|| self.fingerprint.http2_pseudo_order());
2720
2721 let mut h2_transport_config = self.h2_transport_config.clone();
2722 if h2_transport_config.keep_alive_interval.is_none() {
2723 h2_transport_config.keep_alive_interval = http2_settings.ping_interval;
2724 h2_transport_config.keep_alive_while_idle = true;
2725 }
2726
2727 let default_version = if self.prefer_http2 {
2729 HttpVersion::Http2
2730 } else {
2731 HttpVersion::Http1_1
2732 };
2733
2734 let h1_pool = Arc::new(ConnectionPool::with_config(
2738 self.pool_idle_timeout,
2739 self.pool_max_idle_per_host,
2740 100,
2741 ));
2742
2743 let pool_reuse_counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
2747 let h3_client = h3_client.with_pool_reuse_counter(pool_reuse_counter.clone());
2748
2749 Ok(Client {
2750 connector,
2751 insecure_connector,
2752 h3_client,
2753 alt_svc_cache: Arc::new(AltSvcCache::new()),
2754 h2_pool: Arc::new(RwLock::new(HashMap::new())),
2755 h2_direct_pool: Arc::new(StdMutex::new(HashMap::new())),
2756 h1_pool,
2757 h1_connection_slots: Arc::new(RwLock::new(HashMap::new())),
2758 h1_max_connections_per_origin: self.h1_max_connections_per_origin,
2759 http2_settings,
2760 pseudo_order,
2761 default_version,
2762 timeouts: self.timeouts,
2763 h2_transport_config,
2764 h2_direct_streaming_responses: self.h2_direct_streaming_responses,
2765 h3_upgrade_enabled: self.h3_upgrade_enabled,
2766 http2_prior_knowledge: self.http2_prior_knowledge,
2767 danger_accept_invalid_certs: self.danger_accept_invalid_certs,
2768 localhost_allows_invalid_certs: self.localhost_allows_invalid_certs,
2769 default_headers: self.default_headers,
2770 redirect_policy: self.redirect_policy,
2771 cookie_store: self.cookie_store,
2772 fingerprint: self.fingerprint,
2773 http_tls_early_data: self.http_tls_early_data,
2774 pool_reuse_counter,
2775 })
2776 }
2777}
2778
2779impl Default for ClientBuilder {
2780 fn default() -> Self {
2781 Self::new()
2782 }
2783}
2784
2785impl Default for AltSvcCache {
2786 fn default() -> Self {
2787 Self::new()
2788 }
2789}