1use bytes::Bytes;
61pub use http::{header, Method, StatusCode, Version};
62use http_body_util::{BodyExt, Full};
63use hyper::body::Incoming;
64use hyper::Request;
65use hyper_util::client::legacy::Client as HyperClient;
66use hyper_util::rt::TokioExecutor;
67use std::io::Read;
68use std::time::{Duration, Instant, SystemTime};
69use tracing::*;
70
71use crate::masking;
72
73#[cfg(feature = "cookies")]
74use std::collections::HashMap;
75
76pub trait IntoUrl {
79 fn into_url_string(self) -> String;
80}
81
82impl IntoUrl for &str {
83 fn into_url_string(self) -> String {
84 self.to_string()
85 }
86}
87
88impl IntoUrl for String {
89 fn into_url_string(self) -> String {
90 self
91 }
92}
93
94impl IntoUrl for &String {
95 fn into_url_string(self) -> String {
96 self.clone()
97 }
98}
99
100impl IntoUrl for url::Url {
101 fn into_url_string(self) -> String {
102 self.to_string()
103 }
104}
105
106impl IntoUrl for &url::Url {
107 fn into_url_string(self) -> String {
108 self.to_string()
109 }
110}
111
112#[cfg(feature = "multipart")]
113#[derive(Debug)]
114pub struct MultipartForm {
115 }
118
119#[derive(Debug, thiserror::Error)]
120pub enum Error {
121 #[error("HttpError: {0}")]
122 Http(#[from] hyper::Error),
123 #[error("HttpError: {0}")]
124 HttpLegacy(#[from] hyper_util::client::legacy::Error),
125 #[error("UriError: {0}")]
126 Uri(#[from] http::uri::InvalidUri),
127 #[error("HeaderError: {0}")]
128 Header(#[from] http::Error),
129 #[error("TlsError: {0}")]
130 Tls(#[from] hyper_tls::native_tls::Error),
131 #[error("Request timed out after {0:?}")]
132 Timeout(Duration),
133 #[error("failed to deserialize http response into the specified type: {0}")]
134 Deserialize(#[from] serde_json::Error),
135 #[error("{0:#}")]
136 Unexpected(#[from] eyre::Error),
137}
138
139#[derive(Debug, Clone)]
140pub struct LogRequest {
141 pub url: url::Url,
142 pub method: Method,
143 pub headers: header::HeaderMap,
144}
145
146#[derive(Debug, Clone, Default)]
147pub struct LogResponse {
148 pub headers: header::HeaderMap,
149 pub body: String,
150 pub status: StatusCode,
151 pub duration_req: Duration,
152}
153
154#[derive(Debug, Clone)]
155pub struct Log {
156 pub request: LogRequest,
157 pub response: LogResponse,
158 pub started_at: SystemTime,
159 pub ended_at: SystemTime,
160}
161
162#[derive(Debug, Clone)]
191pub struct Response {
192 pub headers: header::HeaderMap,
193 pub status: StatusCode,
194 pub text: String,
195 pub url: url::Url,
196 #[cfg(feature = "cookies")]
197 cookies: Vec<cookie::Cookie<'static>>,
198}
199
200impl Response {
201 pub fn status(&self) -> StatusCode {
211 self.status
212 }
213
214 pub fn headers(&self) -> &header::HeaderMap {
224 &self.headers
225 }
226
227 pub fn url(&self) -> &url::Url {
236 &self.url
237 }
238
239 pub async fn text(self) -> Result<String, Error> {
248 Ok(self.text)
249 }
250
251 pub async fn json<T: serde::de::DeserializeOwned>(self) -> Result<T, Error> {
267 Ok(serde_json::from_str(&self.text)?)
268 }
269
270 #[cfg(feature = "cookies")]
271 pub fn cookies(&self) -> impl Iterator<Item = &cookie::Cookie<'static>> + '_ {
272 self.cookies.iter()
273 }
274
275 async fn from(res: hyper::Response<Incoming>, url: url::Url) -> Result<Self, Error> {
276 let headers = res.headers().clone();
277 let status = res.status();
278
279 #[cfg(feature = "cookies")]
280 let cookies: Vec<cookie::Cookie<'static>> = headers
281 .get_all("set-cookie")
282 .iter()
283 .filter_map(|cookie_header| {
284 cookie_header.to_str().ok().and_then(|cookie_str| {
285 cookie::Cookie::parse(cookie_str)
286 .ok()
287 .map(|c| c.into_owned())
288 })
289 })
290 .collect();
291
292 let body_bytes = res.into_body().collect().await?.to_bytes();
293
294 let text = Self::decompress_body(&headers, &body_bytes);
296
297 Ok(Response {
298 headers,
299 status,
300 url,
301 text,
302 #[cfg(feature = "cookies")]
303 cookies,
304 })
305 }
306
307 fn decompress_body(headers: &header::HeaderMap, body_bytes: &Bytes) -> String {
308 match headers
309 .get("content-encoding")
310 .and_then(|v| v.to_str().ok())
311 {
312 Some("gzip") => {
313 use flate2::read::GzDecoder;
314 let mut decoder = GzDecoder::new(body_bytes.as_ref());
315 let mut decompressed = Vec::new();
316 match decoder.read_to_end(&mut decompressed) {
317 Ok(_) => String::from_utf8_lossy(&decompressed).to_string(),
318 Err(_) => String::from_utf8_lossy(body_bytes).to_string(),
319 }
320 }
321 Some("deflate") => {
322 use flate2::read::{DeflateDecoder, ZlibDecoder};
323
324 let mut zlib_decoder = ZlibDecoder::new(body_bytes.as_ref());
326 let mut decompressed = Vec::new();
327 match zlib_decoder.read_to_end(&mut decompressed) {
328 Ok(_) => String::from_utf8_lossy(&decompressed).to_string(),
329 Err(_) => {
330 let mut deflate_decoder = DeflateDecoder::new(body_bytes.as_ref());
332 let mut decompressed = Vec::new();
333 match deflate_decoder.read_to_end(&mut decompressed) {
334 Ok(_) => String::from_utf8_lossy(&decompressed).to_string(),
335 Err(_) => String::from_utf8_lossy(body_bytes).to_string(),
336 }
337 }
338 }
339 }
340 Some("br") => {
341 let mut decompressed = Vec::new();
342 match brotli::Decompressor::new(body_bytes.as_ref(), 4096)
343 .read_to_end(&mut decompressed)
344 {
345 Ok(_) => String::from_utf8_lossy(&decompressed).to_string(),
346 Err(_) => String::from_utf8_lossy(body_bytes).to_string(),
347 }
348 }
349 Some("zstd") => match zstd::decode_all(body_bytes.as_ref()) {
350 Ok(decompressed) => String::from_utf8_lossy(&decompressed).to_string(),
351 Err(_) => String::from_utf8_lossy(body_bytes).to_string(),
352 },
353 _ => String::from_utf8_lossy(body_bytes).to_string(),
354 }
355 }
356}
357
358#[derive(Clone)]
390pub struct Client {
391 pub(crate) inner: HyperClient<
392 hyper_tls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
393 Full<Bytes>,
394 >,
395 #[cfg(feature = "cookies")]
396 pub(crate) cookie_store:
397 std::sync::Arc<tokio::sync::RwLock<HashMap<String, Vec<cookie::Cookie<'static>>>>>,
398}
399
400impl Default for Client {
401 fn default() -> Self {
402 Self::new()
403 }
404}
405
406impl Client {
407 pub fn new() -> Client {
421 let https = hyper_tls::HttpsConnector::new();
422 let inner = HyperClient::builder(TokioExecutor::new()).build::<_, Full<Bytes>>(https);
423
424 Client {
425 inner,
426 #[cfg(feature = "cookies")]
427 cookie_store: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
428 }
429 }
430
431 pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
432 let url_str = url.into_url_string();
433 debug!("Requesting {url_str}");
434 RequestBuilder::new(self.clone(), Method::GET, &url_str)
435 }
436
437 pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
438 let url_str = url.into_url_string();
439 debug!("Requesting {url_str}");
440 RequestBuilder::new(self.clone(), Method::POST, &url_str)
441 }
442
443 pub fn put<U: IntoUrl>(&self, url: U) -> RequestBuilder {
444 let url_str = url.into_url_string();
445 debug!("Requesting {url_str}");
446 RequestBuilder::new(self.clone(), Method::PUT, &url_str)
447 }
448
449 pub fn patch<U: IntoUrl>(&self, url: U) -> RequestBuilder {
450 let url_str = url.into_url_string();
451 debug!("Requesting {url_str}");
452 RequestBuilder::new(self.clone(), Method::PATCH, &url_str)
453 }
454
455 pub fn delete<U: IntoUrl>(&self, url: U) -> RequestBuilder {
456 let url_str = url.into_url_string();
457 debug!("Requesting {url_str}");
458 RequestBuilder::new(self.clone(), Method::DELETE, &url_str)
459 }
460
461 pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder {
462 let url_str = url.into_url_string();
463 debug!("Requesting {url_str}");
464 RequestBuilder::new(self.clone(), Method::HEAD, &url_str)
465 }
466}
467
468pub struct RequestBuilder {
469 client: Client,
470 method: Method,
471 url: String,
472 headers: header::HeaderMap,
473 body: Option<Vec<u8>>,
474 query_params: Vec<(String, String)>,
475 timeout: Option<Duration>,
476}
477
478impl RequestBuilder {
479 fn new(client: Client, method: Method, url: &str) -> Self {
480 Self {
481 client,
482 method,
483 url: url.to_string(),
484 headers: header::HeaderMap::new(),
485 body: None,
486 query_params: Vec::new(),
487 timeout: None,
488 }
489 }
490
491 pub fn header<K, V>(mut self, key: K, value: V) -> Self
492 where
493 header::HeaderName: TryFrom<K>,
494 <header::HeaderName as TryFrom<K>>::Error: Into<http::Error>,
495 header::HeaderValue: TryFrom<V>,
496 <header::HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
497 {
498 if let (Ok(name), Ok(val)) = (
499 header::HeaderName::try_from(key),
500 header::HeaderValue::try_from(value),
501 ) {
502 self.headers.insert(name, val);
503 }
504 self
505 }
506
507 pub fn headers(mut self, headers: header::HeaderMap) -> Self {
508 self.headers.extend(headers);
509 self
510 }
511
512 pub fn basic_auth<U, P>(mut self, username: U, password: Option<P>) -> Self
513 where
514 U: std::fmt::Display,
515 P: std::fmt::Display,
516 {
517 let auth_value = match password {
518 Some(p) => format!("{username}:{p}"),
519 None => username.to_string(),
520 };
521 let encoded = base64::Engine::encode(
522 &base64::engine::general_purpose::STANDARD,
523 auth_value.as_bytes(),
524 );
525 let auth_header = format!("Basic {encoded}");
526
527 if let Ok(header_value) = header::HeaderValue::from_str(&auth_header) {
528 self.headers.insert(header::AUTHORIZATION, header_value);
529 }
530 self
531 }
532
533 pub fn bearer_auth<T>(mut self, token: T) -> Self
534 where
535 T: std::fmt::Display,
536 {
537 let auth_header = format!("Bearer {token}");
538 if let Ok(header_value) = header::HeaderValue::from_str(&auth_header) {
539 self.headers.insert(header::AUTHORIZATION, header_value);
540 }
541 self
542 }
543
544 pub fn body<T: Into<Vec<u8>>>(mut self, body: T) -> Self {
545 self.body = Some(body.into());
546 self
547 }
548
549 pub fn query<T: serde::Serialize + ?Sized>(mut self, query: &T) -> Self {
550 if let Ok(params) = serde_urlencoded::to_string(query) {
551 for pair in params.split('&') {
552 if let Some((key, value)) = pair.split_once('=') {
553 self.query_params.push((key.to_string(), value.to_string()));
554 }
555 }
556 }
557 self
558 }
559
560 pub fn form<T: serde::Serialize + ?Sized>(mut self, form: &T) -> Self {
561 if let Ok(body) = serde_urlencoded::to_string(form) {
562 self.body = Some(body.into_bytes());
563 self.headers.insert(
564 header::CONTENT_TYPE,
565 header::HeaderValue::from_static("application/x-www-form-urlencoded"),
566 );
567 }
568 self
569 }
570
571 #[cfg(feature = "json")]
572 pub fn json<T: serde::Serialize + ?Sized>(mut self, json: &T) -> Self {
573 if let Ok(body) = serde_json::to_string(json) {
574 self.body = Some(body.into_bytes());
575 self.headers.insert(
576 header::CONTENT_TYPE,
577 header::HeaderValue::from_static("application/json"),
578 );
579 }
580 self
581 }
582
583 #[cfg(feature = "multipart")]
584 pub fn multipart(self, _multipart: MultipartForm) -> Self {
585 self
588 }
589
590 pub async fn send(self) -> Result<Response, Error> {
591 let mut url = self.url.clone();
592
593 if !self.query_params.is_empty() {
595 let query_string: String = self
596 .query_params
597 .iter()
598 .map(|(k, v)| format!("{k}={v}"))
599 .collect::<Vec<_>>()
600 .join("&");
601
602 url = if url.contains('?') {
603 format!("{url}&{query_string}")
604 } else {
605 format!("{url}?{query_string}")
606 };
607 }
608
609 let parsed_url = url::Url::parse(&url).map_err(|e| eyre::eyre!("Invalid URL: {}", e))?;
610 let uri: http::Uri = url.parse()?;
611
612 let mut req_builder = Request::builder().method(self.method.clone()).uri(uri);
613
614 for (name, value) in &self.headers {
616 req_builder = req_builder.header(name, value);
617 }
618
619 #[cfg(feature = "cookies")]
620 {
621 let cookie_store = self.client.cookie_store.read().await;
623 if let Some(domain_cookies) = cookie_store.get(parsed_url.host_str().unwrap_or("")) {
624 if !domain_cookies.is_empty() {
625 let cookie_header = domain_cookies
626 .iter()
627 .map(|cookie| format!("{}={}", cookie.name(), cookie.value()))
628 .collect::<Vec<_>>()
629 .join("; ");
630
631 if let Ok(cookie_value) = header::HeaderValue::from_str(&cookie_header) {
632 req_builder = req_builder.header(header::COOKIE, cookie_value);
633 }
634 }
635 }
636 }
637
638 let body = match &self.body {
639 Some(ref body_data) => Full::new(Bytes::from(body_data.clone())),
640 None => Full::new(Bytes::new()),
641 };
642
643 let req = req_builder.body(body)?;
644
645 let log_request = LogRequest {
646 url: if masking::should_mask_sensitive() {
647 masking::mask_url(&parsed_url)
648 } else {
649 parsed_url.clone()
650 },
651 method: self.method.clone(),
652 headers: if masking::should_mask_sensitive() {
653 masking::mask_headers(&self.headers)
654 } else {
655 self.headers.clone()
656 },
657 };
658
659 let started_at = SystemTime::now();
660 let time_req = Instant::now();
661
662 let res = match self.timeout {
664 Some(timeout) => {
665 match tokio::time::timeout(timeout, self.client.inner.request(req)).await {
666 Ok(result) => result,
667 Err(_) => return Err(Error::Timeout(timeout)),
668 }
669 }
670 None => self.client.inner.request(req).await,
671 };
672 let ended_at = SystemTime::now();
673
674 match res {
675 Ok(res) => {
676 let status = res.status();
677
678 if status.is_redirection() {
680 return Self::follow_redirects(
681 self.client.clone(),
682 self.headers.clone(),
683 self.method.clone(),
684 self.body.clone(),
685 res,
686 parsed_url,
687 log_request,
688 started_at,
689 time_req,
690 10,
691 )
692 .await;
693 }
694
695 let response = Response::from(res, parsed_url).await?;
696 let duration_req = time_req.elapsed();
697
698 #[cfg(feature = "cookies")]
699 {
700 if !response.cookies.is_empty() {
702 let mut cookie_store = self.client.cookie_store.write().await;
703 let domain = response.url().host_str().unwrap_or("").to_string();
704 cookie_store.insert(domain, response.cookies.clone());
705 }
706 }
707
708 let log_response = LogResponse {
709 headers: if masking::should_mask_sensitive() {
710 masking::mask_headers(&response.headers)
711 } else {
712 response.headers.clone()
713 },
714 body: response.text.clone(),
715 status: response.status(),
716 duration_req,
717 };
718
719 crate::runner::publish(crate::runner::EventBody::Http(Box::new(Log {
720 request: log_request,
721 response: log_response,
722 started_at,
723 ended_at,
724 })))?;
725 Ok(response)
726 }
727 Err(e) => {
728 crate::runner::publish(crate::runner::EventBody::Http(Box::new(Log {
729 request: log_request,
730 response: Default::default(),
731 started_at,
732 ended_at,
733 })))?;
734 Err(e.into())
735 }
736 }
737 }
738
739 #[allow(clippy::too_many_arguments)]
740 async fn follow_redirects(
741 client: Client,
742 headers: header::HeaderMap,
743 mut method: Method,
744 body: Option<Vec<u8>>,
745 mut response: hyper::Response<Incoming>,
746 mut current_url: url::Url,
747 original_request: LogRequest,
748 started_at: SystemTime,
749 start_time: Instant,
750 max_redirects: u8,
751 ) -> Result<Response, Error> {
752 let mut redirect_count = 0;
753
754 loop {
755 let status = response.status();
756
757 if !status.is_redirection() || redirect_count >= max_redirects {
758 let ended_at = SystemTime::now();
759 let final_response = Response::from(response, current_url).await?;
760 let duration_req = start_time.elapsed();
761
762 #[cfg(feature = "cookies")]
763 {
764 if !final_response.cookies.is_empty() {
765 let mut cookie_store = client.cookie_store.write().await;
766 let domain = final_response.url().host_str().unwrap_or("").to_string();
767 cookie_store.insert(domain, final_response.cookies.clone());
768 }
769 }
770
771 let log_response = LogResponse {
772 headers: if masking::should_mask_sensitive() {
773 masking::mask_headers(&final_response.headers)
774 } else {
775 final_response.headers.clone()
776 },
777 body: final_response.text.clone(),
778 status: final_response.status(),
779 duration_req,
780 };
781
782 crate::runner::publish(crate::runner::EventBody::Http(Box::new(Log {
783 request: original_request,
784 response: log_response,
785 started_at,
786 ended_at,
787 })))?;
788
789 return Ok(final_response);
790 }
791
792 #[cfg(feature = "cookies")]
794 {
795 let redirect_cookies: Vec<cookie::Cookie<'static>> = response
796 .headers()
797 .get_all("set-cookie")
798 .iter()
799 .filter_map(|cookie_header| {
800 cookie_header.to_str().ok().and_then(|cookie_str| {
801 cookie::Cookie::parse(cookie_str)
802 .ok()
803 .map(|c| c.into_owned())
804 })
805 })
806 .collect();
807
808 if !redirect_cookies.is_empty() {
809 let mut cookie_store = client.cookie_store.write().await;
810 let domain = current_url.host_str().unwrap_or("").to_string();
811 let existing_cookies =
812 cookie_store.entry(domain.clone()).or_insert_with(Vec::new);
813 existing_cookies.extend(redirect_cookies);
814 }
815 }
816
817 let location = match response
819 .headers()
820 .get("location")
821 .and_then(|v| v.to_str().ok())
822 {
823 Some(loc) => loc,
824 None => {
825 let ended_at = SystemTime::now();
827 let final_response = Response::from(response, current_url).await?;
828 let duration_req = start_time.elapsed();
829
830 let log_response = LogResponse {
831 headers: if masking::should_mask_sensitive() {
832 masking::mask_headers(&final_response.headers)
833 } else {
834 final_response.headers.clone()
835 },
836 body: final_response.text.clone(),
837 status: final_response.status(),
838 duration_req,
839 };
840
841 crate::runner::publish(crate::runner::EventBody::Http(Box::new(Log {
842 request: original_request,
843 response: log_response,
844 started_at,
845 ended_at,
846 })))?;
847
848 return Ok(final_response);
849 }
850 };
851
852 current_url = if location.starts_with("http") {
854 url::Url::parse(location).map_err(|e| eyre::eyre!("Invalid redirect URL: {}", e))?
855 } else {
856 current_url
857 .join(location)
858 .map_err(|e| eyre::eyre!("Invalid redirect URL: {}", e))?
859 };
860
861 if status == StatusCode::SEE_OTHER
863 || (method == Method::POST
864 && (status == StatusCode::MOVED_PERMANENTLY || status == StatusCode::FOUND))
865 {
866 method = Method::GET;
867 }
868
869 let redirect_uri: http::Uri = current_url.to_string().parse()?;
871 let mut redirect_req_builder =
872 Request::builder().method(method.clone()).uri(redirect_uri);
873
874 for (name, value) in &headers {
876 redirect_req_builder = redirect_req_builder.header(name, value);
877 }
878
879 #[cfg(feature = "cookies")]
881 {
882 let cookie_store = client.cookie_store.read().await;
883 if let Some(domain_cookies) = cookie_store.get(current_url.host_str().unwrap_or(""))
884 {
885 if !domain_cookies.is_empty() {
886 let cookie_header = domain_cookies
887 .iter()
888 .map(|cookie| format!("{}={}", cookie.name(), cookie.value()))
889 .collect::<Vec<_>>()
890 .join("; ");
891
892 if let Ok(cookie_value) = header::HeaderValue::from_str(&cookie_header) {
893 redirect_req_builder =
894 redirect_req_builder.header(header::COOKIE, cookie_value);
895 }
896 }
897 }
898 }
899
900 let redirect_body = if method == Method::GET {
901 Full::new(Bytes::new())
902 } else {
903 match &body {
904 Some(body_data) => Full::new(Bytes::from(body_data.clone())),
905 None => Full::new(Bytes::new()),
906 }
907 };
908
909 let redirect_req = redirect_req_builder.body(redirect_body)?;
910 response = client.inner.request(redirect_req).await?;
911 redirect_count += 1;
912 }
913 }
914
915 pub fn timeout(mut self, timeout: Duration) -> Self {
916 self.timeout = Some(timeout);
917 self
918 }
919
920 pub fn try_clone(&self) -> Option<Self> {
921 Some(Self {
922 client: self.client.clone(),
923 method: self.method.clone(),
924 url: self.url.clone(),
925 headers: self.headers.clone(),
926 body: self.body.clone(),
927 query_params: self.query_params.clone(),
928 timeout: self.timeout,
929 })
930 }
931
932 pub fn version(self, _version: Version) -> Self {
933 self
936 }
937}