1use base64::Engine;
11use bytes::Bytes;
12use http::{Method, Uri};
13use serde::Serialize;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::RwLock;
18use tokio::time::timeout as tokio_timeout;
19use url::Url;
20
21use crate::cookie::CookieJar;
22use crate::error::{Error, Result};
23use crate::fingerprint::{http2::Http2Settings, FingerprintProfile};
24use crate::headers::Headers;
25use crate::pool::alt_svc::AltSvcCache;
26use crate::pool::multiplexer::{ConnectionPool, PoolKey};
27use crate::request::{Body, IntoUrl, RedirectPolicy, Request};
28use crate::response::Response;
29use crate::timeouts::Timeouts;
30use crate::transport::connector::{BoringConnector, MaybeHttpsStream};
31use crate::transport::h1::H1Connection;
32use crate::transport::h2::{H2Connection, H2PooledConnection, H2Tunnel, PseudoHeaderOrder};
33use crate::transport::h3::H3Client;
34use crate::version::HttpVersion;
35use crate::websocket::{WebSocketBuilder, WebSocketClientParts};
36
37#[derive(Clone)]
46pub struct Client {
47 connector: BoringConnector,
48 insecure_connector: BoringConnector,
50 h3_client: H3Client,
51 alt_svc_cache: Arc<AltSvcCache>,
52 h2_pool: Arc<RwLock<HashMap<PoolKey, H2PooledConnection>>>,
54 h1_pool: Arc<ConnectionPool>,
56 http2_settings: Http2Settings,
57 pseudo_order: PseudoHeaderOrder,
58 default_version: HttpVersion,
59 timeouts: Timeouts,
61 h3_upgrade_enabled: bool,
63 http2_prior_knowledge: bool,
65 danger_accept_invalid_certs: bool,
67 localhost_allows_invalid_certs: bool,
69 default_headers: Headers,
71 redirect_policy: RedirectPolicy,
73 cookie_store: Option<Arc<RwLock<CookieJar>>>,
75}
76
77pub struct RequestBuilder<'a> {
79 client: &'a Client,
80 url: Option<Url>,
81 method: Method,
82 headers: Headers,
83 body: Body,
84 version: Option<HttpVersion>,
85 timeout: Option<Duration>,
86 error: Option<Error>,
87}
88
89pub struct WebSocketH2Builder<'a> {
91 client: &'a Client,
92 url: Option<Url>,
93 headers: Headers,
94 error: Option<Error>,
95}
96
97pub struct ClientBuilder {
99 fingerprint: FingerprintProfile,
100 http2_settings: Option<Http2Settings>,
101 pseudo_order: PseudoHeaderOrder,
102 timeouts: Timeouts,
103 prefer_http2: bool,
104 h3_upgrade_enabled: bool,
105 http2_prior_knowledge: bool,
106 root_certs: Vec<Vec<u8>>,
107 use_platform_roots: bool,
109 danger_accept_invalid_certs: bool,
111 localhost_allows_invalid_certs: bool,
113 default_headers: Headers,
115 redirect_policy: RedirectPolicy,
117 cookie_store: Option<Arc<RwLock<CookieJar>>>,
119}
120
121impl Client {
122 pub fn new() -> Result<Self> {
124 ClientBuilder::new().build()
125 }
126
127 pub fn builder() -> ClientBuilder {
129 ClientBuilder::new()
130 }
131
132 pub fn get(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
134 RequestBuilder::new(self, Method::GET, url)
135 }
136
137 pub fn post(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
139 RequestBuilder::new(self, Method::POST, url)
140 }
141
142 pub fn put(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
144 RequestBuilder::new(self, Method::PUT, url)
145 }
146
147 pub fn delete(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
149 RequestBuilder::new(self, Method::DELETE, url)
150 }
151
152 pub fn head(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
154 RequestBuilder::new(self, Method::HEAD, url)
155 }
156
157 pub fn patch(&self, url: impl IntoUrl) -> RequestBuilder<'_> {
159 RequestBuilder::new(self, Method::PATCH, url)
160 }
161
162 pub fn request(&self, method: Method, url: impl IntoUrl) -> RequestBuilder<'_> {
164 RequestBuilder::new(self, method, url)
165 }
166
167 pub fn websocket_h2(&self, url: impl IntoUrl) -> WebSocketH2Builder<'_> {
169 WebSocketH2Builder::new(self, url)
170 }
171
172 pub fn websocket(&self, url: impl IntoUrl) -> WebSocketBuilder<'_> {
174 Client::websocket_with_parts(
175 WebSocketClientParts {
176 connector: &self.connector,
177 insecure_connector: &self.insecure_connector,
178 default_headers: &self.default_headers,
179 timeouts: &self.timeouts,
180 cookie_store: self.cookie_store.as_ref(),
181 danger_accept_invalid_certs: self.danger_accept_invalid_certs,
182 localhost_allows_invalid_certs: self.localhost_allows_invalid_certs,
183 },
184 url,
185 )
186 }
187
188 pub fn alt_svc_cache(&self) -> &Arc<AltSvcCache> {
190 &self.alt_svc_cache
191 }
192
193 fn is_localhost(host: &str) -> bool {
195 host == "localhost" || host == "127.0.0.1" || host == "::1"
196 }
197
198 fn connector_for_uri(&self, uri: &Uri) -> &BoringConnector {
200 if self.danger_accept_invalid_certs {
202 return &self.insecure_connector;
203 }
204
205 if self.localhost_allows_invalid_certs {
207 if let Some(host) = uri.host() {
208 if Self::is_localhost(host) {
209 return &self.insecure_connector;
210 }
211 }
212 }
213
214 &self.connector
215 }
216}
217
218impl<'a> WebSocketH2Builder<'a> {
219 fn new(client: &'a Client, url: impl IntoUrl) -> Self {
220 let mut error = None;
221 let url = match url.into_url() {
222 Ok(url) => Some(url),
223 Err(err) => {
224 error = Some(err);
225 None
226 }
227 };
228
229 Self {
230 client,
231 url,
232 headers: client.default_headers.clone(),
233 error,
234 }
235 }
236
237 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
239 self.headers.insert(key, value);
240 self
241 }
242
243 pub fn headers(mut self, headers: impl Into<Headers>) -> Self {
245 self.headers = headers.into();
246 self
247 }
248
249 pub async fn open(self) -> Result<H2Tunnel> {
251 if let Some(err) = self.error {
252 return Err(err);
253 }
254
255 let url = self.url.ok_or_else(|| Error::missing("websocket URL"))?;
256
257 let websocket_scheme = url.scheme();
258 let h2_scheme = match websocket_scheme {
259 "wss" => "https",
260 "ws" => {
261 if !self.client.http2_prior_knowledge {
262 return Err(Error::WebSocketUnsupported(
263 "ws:// RFC 8441 requires explicit HTTP/2 prior knowledge".into(),
264 ));
265 }
266 "http"
267 }
268 other => {
269 return Err(Error::WebSocketUnsupported(format!(
270 "RFC 8441 requires ws:// or wss:// URL, got {other}"
271 )));
272 }
273 };
274
275 let mut h2_url = url.clone();
276 h2_url
277 .set_scheme(h2_scheme)
278 .map_err(|_| Error::WebSocketUnsupported("invalid WebSocket URL scheme".into()))?;
279
280 let uri: Uri = h2_url
281 .as_str()
282 .parse()
283 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
284
285 let headers = self.headers.to_vec();
286 let pool_key = Client::make_pool_key(&uri);
287
288 if let Some(conn) = {
289 let pool = self.client.h2_pool.read().await;
290 pool.get(&pool_key).cloned()
291 } {
292 match conn
293 .open_websocket_tunnel(uri.clone(), headers.clone())
294 .await
295 {
296 Ok(tunnel) => return Ok(tunnel),
297 Err(err) => {
298 tracing::debug!("Pooled RFC 8441 tunnel open failed, reconnecting: {}", err);
299 let mut pool = self.client.h2_pool.write().await;
300 pool.remove(&pool_key);
301 }
302 }
303 }
304
305 let connector = self.client.connector_for_uri(&uri);
306 let stream = connector.connect(&uri).await?;
307
308 let use_http2 = if websocket_scheme == "ws" && self.client.http2_prior_knowledge {
309 true
310 } else if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
311 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
312 } else {
313 false
314 };
315
316 if !use_http2 {
317 return Err(Error::WebSocketUnsupported(
318 "RFC 8441 WebSocket requires ALPN h2 or explicit HTTP/2 prior knowledge".into(),
319 ));
320 }
321
322 let h2_conn = H2Connection::connect(
323 stream,
324 self.client.http2_settings.clone(),
325 self.client.pseudo_order,
326 )
327 .await?;
328 let pooled_conn = H2PooledConnection::new(h2_conn);
329
330 {
331 let mut pool = self.client.h2_pool.write().await;
332 pool.insert(pool_key, pooled_conn.clone());
333 }
334
335 pooled_conn.open_websocket_tunnel(uri, headers).await
336 }
337}
338
339impl<'a> RequestBuilder<'a> {
340 fn new(client: &'a Client, method: Method, url: impl IntoUrl) -> Self {
341 let mut error = None;
342 let url = match url.into_url() {
343 Ok(url) => Some(url),
344 Err(err) => {
345 error = Some(err);
346 None
347 }
348 };
349
350 Self {
351 client,
352 url,
353 method,
354 headers: client.default_headers.clone(),
355 body: Body::Empty,
356 version: None,
357 timeout: None,
358 error,
359 }
360 }
361
362 fn set_error(&mut self, error: Error) {
363 if self.error.is_none() {
364 self.error = Some(error);
365 }
366 }
367
368 fn ensure_content_type(&mut self, value: &str) {
369 if !self.headers.contains("content-type") {
370 self.headers.insert("Content-Type", value.to_string());
371 }
372 }
373
374 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
376 self.headers.insert(key, value);
377 self
378 }
379
380 pub fn header_append(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
382 self.headers.append(key, value);
383 self
384 }
385
386 pub fn headers(mut self, headers: impl Into<Headers>) -> Self {
388 self.headers = headers.into();
389 self
390 }
391
392 pub fn body(mut self, body: impl Into<Body>) -> Self {
394 self.body = body.into();
395 self
396 }
397
398 pub fn query<T: Serialize + ?Sized>(mut self, query: &T) -> Self {
400 if self.error.is_some() {
401 return self;
402 }
403
404 let url = match self.url.as_mut() {
405 Some(url) => url,
406 None => return self,
407 };
408
409 match serde_urlencoded::to_string(query) {
410 Ok(encoded) => {
411 if !encoded.is_empty() {
412 let merged = match url.query() {
413 Some(existing) if !existing.is_empty() => {
414 format!("{}&{}", existing, encoded)
415 }
416 _ => encoded,
417 };
418 url.set_query(Some(&merged));
419 }
420 }
421 Err(err) => self.set_error(err.into()),
422 }
423
424 self
425 }
426
427 pub fn json<T: Serialize + ?Sized>(mut self, json: &T) -> Self {
429 if self.error.is_some() {
430 return self;
431 }
432
433 match serde_json::to_vec(json) {
434 Ok(bytes) => {
435 self.body = Body::Json(bytes);
436 self.ensure_content_type("application/json");
437 }
438 Err(err) => self.set_error(err.into()),
439 }
440
441 self
442 }
443
444 pub fn form<T: Serialize + ?Sized>(mut self, form: &T) -> Self {
446 if self.error.is_some() {
447 return self;
448 }
449
450 match serde_urlencoded::to_string(form) {
451 Ok(encoded) => {
452 self.body = Body::Form(encoded);
453 self.ensure_content_type("application/x-www-form-urlencoded");
454 }
455 Err(err) => self.set_error(err.into()),
456 }
457
458 self
459 }
460
461 pub fn bearer_auth(mut self, token: impl AsRef<str>) -> Self {
463 self.headers
464 .insert("Authorization", format!("Bearer {}", token.as_ref()));
465 self
466 }
467
468 pub fn basic_auth<P: AsRef<str>>(
470 mut self,
471 username: impl AsRef<str>,
472 password: Option<P>,
473 ) -> Self {
474 let creds = match password {
475 Some(p) => format!("{}:{}", username.as_ref(), p.as_ref()),
476 None => format!("{}:", username.as_ref()),
477 };
478 let encoded = base64::engine::general_purpose::STANDARD.encode(creds.as_bytes());
479 self.headers
480 .insert("Authorization", format!("Basic {}", encoded));
481 self
482 }
483
484 pub fn timeout(mut self, timeout: Duration) -> Self {
486 self.timeout = Some(timeout);
487 self
488 }
489
490 pub fn version(mut self, version: HttpVersion) -> Self {
492 self.version = Some(version);
493 self
494 }
495
496 pub fn build(self) -> Result<Request> {
498 if let Some(error) = self.error {
499 return Err(error);
500 }
501
502 let url = self.url.ok_or_else(|| Error::missing("url"))?;
503
504 Ok(Request {
505 method: self.method,
506 url,
507 headers: self.headers,
508 body: self.body,
509 version: self.version,
510 timeout: self.timeout,
511 })
512 }
513
514 pub async fn send(self) -> Result<Response> {
516 let client = self.client.clone();
517 let request = self.build()?;
518 client.execute(request).await
519 }
520
521 pub async fn send_streaming(
525 self,
526 ) -> Result<(
527 Response,
528 tokio::sync::mpsc::Receiver<std::result::Result<Bytes, crate::transport::h2::H2Error>>,
529 )> {
530 let client = self.client.clone();
531 let request = self.build()?;
532 let mut timeouts = client.timeouts.clone();
533 if let Some(total) = request.timeout {
534 timeouts.total = Some(total);
535 }
536 let mut headers = request.headers.clone();
537
538 if let Some(jar) = &client.cookie_store {
539 if !headers.contains("cookie") {
540 if let Some(cookie_header) =
541 jar.read().await.build_cookie_header(request.url.as_str())
542 {
543 headers.insert("Cookie", cookie_header);
544 }
545 }
546 }
547
548 let version = request.version.unwrap_or(client.default_version);
549
550 if !matches!(version, HttpVersion::Http2 | HttpVersion::Auto) {
552 return Err(Error::HttpProtocol(
553 "Streaming only supported for HTTP/2".into(),
554 ));
555 }
556
557 let uri: Uri = request
559 .url
560 .as_str()
561 .parse()
562 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
563
564 let connector = client.connector_for_uri(&uri);
567 let connect_fut = connector.connect(&uri);
568 let stream = if let Some(connect_timeout) = timeouts.connect {
569 tokio_timeout(connect_timeout, connect_fut)
570 .await
571 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
572 } else {
573 connect_fut.await?
574 };
575
576 let alpn = stream.alpn_protocol();
578 if !alpn.is_h2() {
579 return Err(Error::HttpProtocol(format!(
580 "Expected h2 ALPN, got {:?}",
581 alpn
582 )));
583 }
584
585 let h2_connect_fut =
587 H2Connection::connect(stream, client.http2_settings.clone(), client.pseudo_order);
588 let mut h2_conn = if let Some(connect_timeout) = timeouts.connect {
589 tokio_timeout(connect_timeout, h2_connect_fut)
590 .await
591 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
592 } else {
593 h2_connect_fut.await?
594 };
595
596 let mut path = request.url.path().to_string();
598 if path.is_empty() {
599 path = "/".to_string();
600 }
601 if let Some(query) = request.url.query() {
602 path.push('?');
603 path.push_str(query);
604 }
605
606 let host = request.url.host_str().unwrap_or("localhost");
607 let authority = if let Some(port) = request.url.port_or_known_default() {
608 if port == 443 {
609 host.to_string()
610 } else {
611 format!("{}:{}", host, port)
612 }
613 } else {
614 host.to_string()
615 };
616
617 let full_uri = format!("https://{}{}", authority, path);
619 let mut request_builder = http::Request::builder()
620 .method(request.method.clone())
621 .uri(&full_uri);
622
623 for (key, value) in headers.iter() {
625 request_builder = request_builder.header(key, value);
626 }
627
628 let body = request.body.clone().into_bytes()?;
629 let http_request = request_builder
630 .body(body)
631 .map_err(|e| Error::HttpProtocol(format!("Failed to build request: {}", e)))?;
632
633 let send_fut = h2_conn.send_request_streaming(http_request);
635 let (response, rx) = if let Some(ttfb_timeout) = timeouts.ttfb {
636 tokio_timeout(ttfb_timeout, send_fut)
637 .await
638 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))??
639 } else {
640 send_fut.await?
641 };
642
643 tokio::spawn(async move {
645 loop {
646 match h2_conn.read_streaming_frames().await {
647 Ok(true) => continue,
648 Ok(false) => break,
649 Err(e) => {
650 tracing::debug!("Streaming read error: {}", e);
651 break;
652 }
653 }
654 }
655 });
656
657 let status = response.status().as_u16();
659 let headers = response
660 .headers()
661 .iter()
662 .map(|(k, v)| (k.as_str().to_string(), v.to_str().unwrap_or("").to_string()))
663 .collect::<Vec<(String, String)>>();
664
665 let our_response = crate::response::Response::new(
666 status,
667 Headers::from(headers),
668 Bytes::new(), "HTTP/2".to_string(),
670 );
671
672 let request_url = request.url.clone();
673 let our_response = our_response.with_url(request_url.clone());
674
675 if let Some(jar) = &client.cookie_store {
676 jar.write()
677 .await
678 .store_from_headers(our_response.headers(), request_url.as_str());
679 }
680
681 Ok((our_response, rx))
682 }
683}
684
685impl Client {
686 pub async fn execute(&self, mut request: Request) -> Result<Response> {
688 let policy = self.redirect_policy.clone();
689 let mut redirects = 0u32;
690
691 loop {
692 let mut headers = request.headers.clone();
693 let cookie_injected = self.apply_cookie_header(&request, &mut headers).await;
694 request.headers = headers;
695
696 let mut timeouts = self.timeouts.clone();
697 if let Some(total) = request.timeout {
698 timeouts.total = Some(total);
699 }
700
701 let response = self.execute_once(&request, &timeouts).await?;
702
703 self.store_cookies(&response, &request.url).await;
704
705 if matches!(policy, RedirectPolicy::None) || !response.is_redirect() {
706 return Ok(response);
707 }
708
709 let location = match response.redirect_url() {
710 Some(value) => value,
711 None => return Ok(response),
712 };
713
714 if let RedirectPolicy::Limited(limit) = policy {
715 if redirects >= limit {
716 return Err(Error::RedirectLimit { count: limit });
717 }
718 }
719
720 let next_url = request.url.join(location).map_err(Error::from)?;
721 let mut next_request = self.redirect_request(&request, &response, next_url);
722
723 if cookie_injected {
724 next_request.headers.remove("cookie");
725 }
726
727 request = next_request;
728 redirects += 1;
729 }
730 }
731
732 async fn execute_once(&self, request: &Request, timeouts: &Timeouts) -> Result<Response> {
733 let version = request.version.unwrap_or(self.default_version);
734
735 if matches!(version, HttpVersion::Http3Only) {
737 return self
738 .send_h3_for_url(request, request.url.clone(), timeouts)
739 .await;
740 }
741
742 if matches!(version, HttpVersion::Http3) {
744 match self
745 .send_h3_for_url(request, request.url.clone(), timeouts)
746 .await
747 {
748 Ok(response) => return Ok(response),
749 Err(e) => {
750 tracing::debug!("HTTP/3 failed, falling back to HTTP/1.1 or HTTP/2: {}", e);
751 }
753 }
754 }
755
756 if matches!(version, HttpVersion::Auto) && self.h3_upgrade_enabled {
758 let origin = Self::origin_for_url(&request.url);
759 if let Some(alt_svc) = self.alt_svc_cache.get_h3_alternative(&origin).await {
760 tracing::debug!(
761 "Alt-Svc indicates HTTP/3 support for {}, attempting upgrade",
762 origin
763 );
764
765 let mut h3_url = request.url.clone();
766 let _ = h3_url.set_scheme("https");
767 if let Some(ref host) = alt_svc.host {
768 h3_url
769 .set_host(Some(host))
770 .map_err(|_| Error::HttpProtocol("Invalid Alt-Svc host".into()))?;
771 }
772 let _ = h3_url.set_port(Some(alt_svc.port));
773
774 match self
775 .send_h3_for_url(request, h3_url.clone(), timeouts)
776 .await
777 {
778 Ok(response) => return Ok(response.with_url(h3_url)),
779 Err(e) => {
780 tracing::debug!("HTTP/3 upgrade failed, using HTTP/1.1 or HTTP/2: {}", e);
781 }
783 }
784 }
785 }
786
787 self.send_h1_h2(request, version, timeouts).await
789 }
790
791 async fn send_h3_for_url(
792 &self,
793 request: &Request,
794 url: Url,
795 timeouts: &Timeouts,
796 ) -> Result<Response> {
797 let body = if request.body.is_empty() {
798 None
799 } else {
800 Some(request.body.clone().into_bytes()?.to_vec())
801 };
802
803 let fut = self.h3_client.send_request(
804 url.as_str(),
805 request.method.as_str(),
806 request.headers.to_vec(),
807 body,
808 );
809
810 let response = if let Some(total_timeout) = timeouts.total {
812 tokio_timeout(total_timeout, fut)
813 .await
814 .map_err(|_| Error::TotalTimeout(total_timeout))??
815 } else {
816 fut.await?
817 };
818
819 Ok(response.with_url(url))
820 }
821
822 async fn send_h1_h2(
823 &self,
824 request: &Request,
825 version: HttpVersion,
826 timeouts: &Timeouts,
827 ) -> Result<Response> {
828 let request_url = request.url.clone();
830
831 let uri: Uri = request
833 .url
834 .as_str()
835 .parse()
836 .map_err(|e| Error::HttpProtocol(format!("Invalid URI: {}", e)))?;
837
838 let prefer_http2 = match version {
840 HttpVersion::Http1_1 => false,
841 HttpVersion::Http2 => true,
842 HttpVersion::Http3 | HttpVersion::Http3Only => {
843 return Err(Error::HttpProtocol("HTTP/3 should use send_h3".into()));
844 }
845 HttpVersion::Auto => matches!(self.default_version, HttpVersion::Http2),
846 };
847
848 let h3_upgrade_enabled = self.h3_upgrade_enabled;
850 let alt_svc_cache = self.alt_svc_cache.clone();
851 let origin = Self::origin_for_url(&request.url);
852
853 let headers_vec = request.headers.to_vec();
854 let body_bytes = if request.body.is_empty() {
855 None
856 } else {
857 Some(request.body.clone().into_bytes()?)
858 };
859
860 if prefer_http2 {
862 let pool_key = Self::make_pool_key(&uri);
863
864 let pooled = {
866 let pool = self.h2_pool.read().await;
867 pool.get(&pool_key).cloned()
868 };
869
870 if let Some(conn) = pooled {
871 let result = conn
873 .send_request(
874 request.method.clone(),
875 &uri,
876 headers_vec.clone(),
877 body_bytes.clone(),
878 )
879 .await;
880
881 match result {
882 Ok(response) => {
883 if h3_upgrade_enabled {
885 if let Some(alt_svc) = response.get_header("alt-svc") {
886 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
887 }
888 }
889 return Ok(response.with_url(request_url));
890 }
891 Err(e) => {
892 tracing::debug!("Pooled HTTP/2 connection failed, creating new: {}", e);
894 let mut pool = self.h2_pool.write().await;
895 pool.remove(&pool_key);
896 }
897 }
898 }
899
900 let connector = self.connector_for_uri(&uri);
903 let connect_fut = connector.connect(&uri);
904 let stream = if let Some(connect_timeout) = timeouts.connect {
905 tokio_timeout(connect_timeout, connect_fut)
906 .await
907 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
908 } else {
909 connect_fut.await?
910 };
911
912 let use_http2 = if self.http2_prior_knowledge && !stream.alpn_protocol().is_h2() {
914 true
916 } else if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
917 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
918 } else {
919 false
920 };
921
922 if use_http2 {
923 let h2_conn =
925 H2Connection::connect(stream, self.http2_settings.clone(), self.pseudo_order)
926 .await?;
927 let pooled_conn = H2PooledConnection::new(h2_conn);
928
929 {
931 let mut pool = self.h2_pool.write().await;
932 pool.insert(pool_key, pooled_conn.clone());
933 }
934
935 let fut = pooled_conn.send_request(
937 request.method.clone(),
938 &uri,
939 headers_vec.clone(),
940 body_bytes.clone(),
941 );
942
943 let response = if let Some(ttfb_timeout) = timeouts.ttfb {
944 tokio_timeout(ttfb_timeout, fut)
945 .await
946 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
947 } else {
948 fut.await
949 }?;
950
951 if h3_upgrade_enabled {
953 if let Some(alt_svc) = response.get_header("alt-svc") {
954 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
955 }
956 }
957
958 return Ok(response.with_url(request_url));
959 }
960 }
962
963 let pool_key = Self::make_pool_key(&uri);
965
966 let mut stream_opt = self.h1_pool.get_h1(&pool_key).await;
968 let mut used_pooled = stream_opt.is_some();
969
970 let mut stream = if let Some(pooled_stream) = stream_opt.take() {
972 tracing::debug!("H1: Reusing pooled connection for {:?}", pool_key);
973 pooled_stream
974 } else {
975 tracing::debug!("H1: Creating new connection for {:?}", pool_key);
976 let connector = self.connector_for_uri(&uri);
978 let connect_fut = connector.connect(&uri);
979 if let Some(connect_timeout) = timeouts.connect {
980 tokio_timeout(connect_timeout, connect_fut)
981 .await
982 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
983 } else {
984 connect_fut.await?
985 }
986 };
987
988 let server_wants_h2 = if let MaybeHttpsStream::Https(ref ssl_stream) = stream {
991 ssl_stream.ssl().selected_alpn_protocol() == Some(b"h2")
992 } else {
993 false
994 };
995
996 let response = if server_wants_h2 {
997 tracing::debug!("Server selected h2 via ALPN, upgrading to HTTP/2");
999
1000 let h2_conn =
1001 H2Connection::connect(stream, self.http2_settings.clone(), self.pseudo_order)
1002 .await?;
1003 let pooled_conn = H2PooledConnection::new(h2_conn);
1004
1005 {
1007 let mut pool = self.h2_pool.write().await;
1008 pool.insert(pool_key, pooled_conn.clone());
1009 }
1010
1011 let fut = pooled_conn.send_request(
1013 request.method.clone(),
1014 &uri,
1015 headers_vec.clone(),
1016 body_bytes.clone(),
1017 );
1018
1019 if let Some(ttfb_timeout) = timeouts.ttfb {
1020 tokio_timeout(ttfb_timeout, fut)
1021 .await
1022 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1023 } else {
1024 fut.await
1025 }?
1026 } else {
1027 let result = loop {
1031 let stream_for_request = stream;
1032 let fut = Self::do_send_http1(
1033 stream_for_request,
1034 request.method.clone(),
1035 &uri,
1036 headers_vec.clone(),
1037 body_bytes.clone(),
1038 );
1039
1040 let request_result = if let Some(ttfb_timeout) = timeouts.ttfb {
1042 tokio_timeout(ttfb_timeout, fut)
1043 .await
1044 .map_err(|_| Error::TtfbTimeout(ttfb_timeout))?
1045 } else {
1046 fut.await
1047 };
1048
1049 match request_result {
1050 Ok((resp, returned_stream)) => {
1051 self.h1_pool.put_h1(pool_key.clone(), returned_stream).await;
1053 break Ok(resp);
1054 }
1055 Err(e) => {
1056 if used_pooled {
1058 tracing::debug!(
1059 "H1: Pooled connection failed for {:?}, creating new: {}",
1060 pool_key,
1061 e
1062 );
1063 let connector = self.connector_for_uri(&uri);
1065 let connect_fut = connector.connect(&uri);
1066 stream = if let Some(connect_timeout) = timeouts.connect {
1067 tokio_timeout(connect_timeout, connect_fut)
1068 .await
1069 .map_err(|_| Error::ConnectTimeout(connect_timeout))??
1070 } else {
1071 connect_fut.await?
1072 };
1073 used_pooled = false; continue;
1075 } else {
1076 tracing::debug!(
1078 "H1: Request failed for {:?}, discarding connection: {}",
1079 pool_key,
1080 e
1081 );
1082 break Err(e);
1083 }
1084 }
1085 }
1086 };
1087
1088 result?
1089 };
1090
1091 if h3_upgrade_enabled {
1093 if let Some(alt_svc) = response.get_header("alt-svc") {
1094 alt_svc_cache.parse_and_store(&origin, alt_svc).await;
1095 }
1096 }
1097
1098 Ok(response.with_url(request_url))
1099 }
1100
1101 fn redirect_request(&self, request: &Request, response: &Response, next_url: Url) -> Request {
1102 let status = response.status().as_u16();
1103 let mut method = request.method.clone();
1104 let mut body = request.body.clone();
1105 let mut headers = request.headers.clone();
1106
1107 let should_switch = status == 303
1108 || ((status == 301 || status == 302) && !matches!(method, Method::GET | Method::HEAD));
1109
1110 if should_switch {
1111 method = Method::GET;
1112 body = Body::Empty;
1113 headers.remove("content-length");
1114 headers.remove("content-type");
1115 }
1116
1117 if Self::is_cross_origin(&request.url, &next_url) {
1118 headers.remove("authorization");
1119 }
1120
1121 Request {
1122 method,
1123 url: next_url,
1124 headers,
1125 body,
1126 version: request.version,
1127 timeout: request.timeout,
1128 }
1129 }
1130
1131 async fn apply_cookie_header(&self, request: &Request, headers: &mut Headers) -> bool {
1132 if let Some(jar) = &self.cookie_store {
1133 if !headers.contains("cookie") {
1134 if let Some(cookie_header) =
1135 jar.read().await.build_cookie_header(request.url.as_str())
1136 {
1137 headers.insert("Cookie", cookie_header);
1138 return true;
1139 }
1140 }
1141 }
1142 false
1143 }
1144
1145 async fn store_cookies(&self, response: &Response, url: &Url) {
1146 if let Some(jar) = &self.cookie_store {
1147 jar.write()
1148 .await
1149 .store_from_headers(response.headers(), url.as_str());
1150 }
1151 }
1152
1153 fn make_pool_key(uri: &Uri) -> PoolKey {
1155 let host = uri.host().unwrap_or("localhost").to_string();
1156 let is_https = uri.scheme_str() == Some("https");
1157 let port = uri.port_u16().unwrap_or(if is_https { 443 } else { 80 });
1158 PoolKey::new(host, port, is_https)
1159 }
1160
1161 async fn do_send_http1(
1162 stream: MaybeHttpsStream,
1163 method: Method,
1164 uri: &Uri,
1165 headers: Vec<(String, String)>,
1166 body: Option<Bytes>,
1167 ) -> Result<(Response, MaybeHttpsStream)> {
1168 let mut conn = H1Connection::new(stream);
1169 let response = conn.send_request(method, uri, headers, body).await?;
1170 let stream = conn.into_inner();
1171 Ok((response, stream))
1172 }
1173
1174 fn origin_for_url(url: &Url) -> String {
1176 let scheme = url.scheme();
1177 let host = url.host_str().unwrap_or("localhost");
1178 let port = url
1179 .port_or_known_default()
1180 .unwrap_or(if scheme == "https" { 443 } else { 80 });
1181
1182 if (scheme == "https" && port == 443) || (scheme == "http" && port == 80) {
1183 format!("{}://{}", scheme, host)
1184 } else {
1185 format!("{}://{}:{}", scheme, host, port)
1186 }
1187 }
1188
1189 fn is_cross_origin(a: &Url, b: &Url) -> bool {
1190 a.scheme() != b.scheme()
1191 || a.host_str() != b.host_str()
1192 || a.port_or_known_default() != b.port_or_known_default()
1193 }
1194}
1195
1196impl ClientBuilder {
1197 pub fn new() -> Self {
1206 Self {
1207 fingerprint: FingerprintProfile::default(),
1208 http2_settings: None,
1209 pseudo_order: PseudoHeaderOrder::Chrome,
1210 timeouts: Timeouts::default(),
1211 prefer_http2: true, h3_upgrade_enabled: true, http2_prior_knowledge: false,
1214 root_certs: Vec::new(),
1215 use_platform_roots: false,
1216 danger_accept_invalid_certs: false,
1217 localhost_allows_invalid_certs: true, default_headers: Headers::new(),
1219 redirect_policy: RedirectPolicy::None,
1220 cookie_store: None,
1221 }
1222 }
1223
1224 pub fn fingerprint(mut self, fingerprint: FingerprintProfile) -> Self {
1226 self.fingerprint = fingerprint;
1227 self
1228 }
1229
1230 pub fn http2_settings(mut self, settings: Http2Settings) -> Self {
1232 self.http2_settings = Some(settings);
1233 self
1234 }
1235
1236 pub fn pseudo_order(mut self, order: PseudoHeaderOrder) -> Self {
1238 self.pseudo_order = order;
1239 self
1240 }
1241
1242 pub fn timeouts(mut self, timeouts: Timeouts) -> Self {
1246 self.timeouts = timeouts;
1247 self
1248 }
1249
1250 pub fn api_timeouts(mut self) -> Self {
1254 self.timeouts = Timeouts::api_defaults();
1255 self
1256 }
1257
1258 pub fn streaming_timeouts(mut self) -> Self {
1263 self.timeouts = Timeouts::streaming_defaults();
1264 self
1265 }
1266
1267 #[deprecated(
1272 since = "1.0.2",
1273 note = "Use `timeouts()` or `total_timeout()` instead"
1274 )]
1275 pub fn timeout(mut self, timeout: Duration) -> Self {
1276 self.timeouts.total = Some(timeout);
1277 self
1278 }
1279
1280 pub fn total_timeout(mut self, timeout: Duration) -> Self {
1282 self.timeouts.total = Some(timeout);
1283 self
1284 }
1285
1286 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
1288 self.timeouts.connect = Some(timeout);
1289 self
1290 }
1291
1292 pub fn ttfb_timeout(mut self, timeout: Duration) -> Self {
1294 self.timeouts.ttfb = Some(timeout);
1295 self
1296 }
1297
1298 pub fn read_timeout(mut self, timeout: Duration) -> Self {
1300 self.timeouts.read_idle = Some(timeout);
1301 self
1302 }
1303
1304 pub fn write_timeout(mut self, timeout: Duration) -> Self {
1306 self.timeouts.write_idle = Some(timeout);
1307 self
1308 }
1309
1310 pub fn pool_acquire_timeout(mut self, timeout: Duration) -> Self {
1312 self.timeouts.pool_acquire = Some(timeout);
1313 self
1314 }
1315
1316 pub fn default_headers(mut self, headers: impl Into<Headers>) -> Self {
1318 self.default_headers = headers.into();
1319 self
1320 }
1321
1322 pub fn default_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
1324 self.default_headers.insert(name, value);
1325 self
1326 }
1327
1328 pub fn user_agent(mut self, value: impl Into<String>) -> Self {
1330 self.default_headers.insert("User-Agent", value.into());
1331 self
1332 }
1333
1334 pub fn redirect_policy(mut self, policy: RedirectPolicy) -> Self {
1336 self.redirect_policy = policy;
1337 self
1338 }
1339
1340 pub fn cookie_store(mut self, enabled: bool) -> Self {
1342 if enabled {
1343 self.cookie_store = Some(Arc::new(RwLock::new(CookieJar::new())));
1344 } else {
1345 self.cookie_store = None;
1346 }
1347 self
1348 }
1349
1350 pub fn cookie_jar(mut self, jar: Arc<RwLock<CookieJar>>) -> Self {
1352 self.cookie_store = Some(jar);
1353 self
1354 }
1355
1356 pub fn prefer_http2(mut self, prefer: bool) -> Self {
1358 self.prefer_http2 = prefer;
1359 self
1360 }
1361
1362 pub fn h3_upgrade(mut self, enabled: bool) -> Self {
1369 self.h3_upgrade_enabled = enabled;
1370 self
1371 }
1372
1373 pub fn http2_prior_knowledge(mut self, enabled: bool) -> Self {
1376 self.http2_prior_knowledge = enabled;
1377 if enabled {
1379 self.prefer_http2 = true;
1380 }
1381 self
1382 }
1383
1384 pub fn add_root_certificate(mut self, cert: Vec<u8>) -> Self {
1386 self.root_certs.push(cert);
1387 self
1388 }
1389
1390 pub fn with_platform_roots(mut self, enabled: bool) -> Self {
1401 self.use_platform_roots = enabled;
1402 self
1403 }
1404
1405 pub fn danger_accept_invalid_certs(mut self, accept: bool) -> Self {
1411 self.danger_accept_invalid_certs = accept;
1412 self
1413 }
1414
1415 pub fn localhost_allows_invalid_certs(mut self, allow: bool) -> Self {
1423 self.localhost_allows_invalid_certs = allow;
1424 self
1425 }
1426
1427 pub fn build(self) -> Result<Client> {
1429 let tls_fingerprint = self.fingerprint.tls_fingerprint();
1431 let mut connector = BoringConnector::with_fingerprint(tls_fingerprint.clone())
1432 .with_root_certificates(self.root_certs.clone())
1433 .with_platform_roots(self.use_platform_roots);
1434
1435 if self.danger_accept_invalid_certs {
1437 connector = connector.danger_accept_invalid_certs(true);
1438 }
1439
1440 let insecure_connector = BoringConnector::with_fingerprint(tls_fingerprint.clone())
1442 .with_root_certificates(self.root_certs)
1443 .with_platform_roots(self.use_platform_roots)
1444 .danger_accept_invalid_certs(true);
1445
1446 let h3_client = H3Client::with_fingerprint(tls_fingerprint);
1448
1449 let http2_settings = self.http2_settings.unwrap_or_default();
1451
1452 let default_version = if self.prefer_http2 {
1454 HttpVersion::Http2
1455 } else {
1456 HttpVersion::Http1_1
1457 };
1458
1459 Ok(Client {
1460 connector,
1461 insecure_connector,
1462 h3_client,
1463 alt_svc_cache: Arc::new(AltSvcCache::new()),
1464 h2_pool: Arc::new(RwLock::new(HashMap::new())),
1465 h1_pool: Arc::new(ConnectionPool::new()),
1466 http2_settings,
1467 pseudo_order: self.pseudo_order,
1468 default_version,
1469 timeouts: self.timeouts,
1470 h3_upgrade_enabled: self.h3_upgrade_enabled,
1471 http2_prior_knowledge: self.http2_prior_knowledge,
1472 danger_accept_invalid_certs: self.danger_accept_invalid_certs,
1473 localhost_allows_invalid_certs: self.localhost_allows_invalid_certs,
1474 default_headers: self.default_headers,
1475 redirect_policy: self.redirect_policy,
1476 cookie_store: self.cookie_store,
1477 })
1478 }
1479}
1480
1481impl Default for ClientBuilder {
1482 fn default() -> Self {
1483 Self::new()
1484 }
1485}
1486
1487impl Default for AltSvcCache {
1488 fn default() -> Self {
1489 Self::new()
1490 }
1491}