1use std::time::Duration;
18
19use async_stream::stream;
20use bytes::Bytes;
21use futures_util::StreamExt;
22use http::header::RETRY_AFTER;
23use http::{HeaderMap, StatusCode};
24use qubit_function::MutatingFunction;
25use qubit_retry::{
26 AttemptFailure, Jitter, RetryDecision, RetryError, RetryExecutor, RetryOptions, RetryResult,
27};
28use reqwest::Response;
29use tokio_util::sync::CancellationToken;
30use url::Host;
31use url::Url;
32
33use crate::{
34 AsyncHeaderInjector, HeaderInjector, HttpClientOptions, HttpError, HttpErrorKind, HttpLogger,
35 HttpRequest, HttpRequestBody, HttpRequestBuilder, HttpResponse, HttpResult, HttpRetryOptions,
36 HttpStreamResponse, RetryHint,
37};
38
39#[derive(Clone)]
42pub struct HttpClient {
43 client: reqwest::Client,
45 options: HttpClientOptions,
47 injectors: Vec<HeaderInjector>,
50 async_injectors: Vec<AsyncHeaderInjector>,
52}
53
54impl std::fmt::Debug for HttpClient {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("HttpClient")
68 .field("options", &self.options)
69 .field("injectors", &self.injectors)
70 .field("async_injectors", &self.async_injectors)
71 .finish_non_exhaustive()
72 }
73}
74
75impl HttpClient {
76 pub(crate) fn new(client: reqwest::Client, options: HttpClientOptions) -> Self {
87 Self {
88 client,
89 options,
90 injectors: Vec::new(),
91 async_injectors: Vec::new(),
92 }
93 }
94
95 pub fn options(&self) -> &HttpClientOptions {
101 &self.options
102 }
103
104 pub fn add_header_injector(&mut self, injector: HeaderInjector) {
113 self.injectors.push(injector);
114 }
115
116 pub fn add_async_header_injector(&mut self, injector: AsyncHeaderInjector) {
124 self.async_injectors.push(injector);
125 }
126
127 pub fn add_header(&mut self, name: &str, value: &str) -> HttpResult<&mut Self> {
142 self.options.add_header(name, value)?;
143 Ok(self)
144 }
145
146 pub fn add_headers<'a, I>(&mut self, headers: I) -> HttpResult<&mut Self>
160 where
161 I: IntoIterator<Item = (&'a str, &'a str)>,
162 {
163 self.options.add_headers(headers)?;
164 Ok(self)
165 }
166
167 pub fn clear_header_injectors(&mut self) {
172 self.injectors.clear();
173 }
174
175 pub fn clear_async_header_injectors(&mut self) {
180 self.async_injectors.clear();
181 }
182
183 pub fn request(&self, method: http::Method, path: &str) -> HttpRequestBuilder {
195 HttpRequestBuilder::new(method, path)
196 }
197
198 pub async fn execute(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
211 let retry_options = self.resolve_retry_options(&request);
212 let honor_retry_after = request.retry_override.should_honor_retry_after();
213 if !self.should_retry_request(&request, &retry_options) {
214 return self.execute_once(request).await;
215 }
216 self.execute_with_retry(request, retry_options, honor_retry_after)
217 .await
218 }
219
220 async fn execute_once(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
231 let url = self.resolve_url(&request)?;
232 let method = request.method.clone();
233 let cancellation_token = request.cancellation_token.clone();
234 if let Some(error) = cancelled_request_error_if_needed(
235 cancellation_token.as_ref(),
236 &method,
237 &url,
238 "Request cancelled before sending",
239 ) {
240 return Err(error);
241 }
242 let headers = self.build_headers(&request).await?;
243
244 let body_for_log = clone_request_body_for_log(&request.body);
245
246 let logger = HttpLogger::new(&self.options.logging, &self.options.sensitive_headers);
247 logger.log_request(&method, &url, &headers, body_for_log.as_ref());
248
249 let mut builder = self.client.request(method.clone(), url.clone());
250 builder = builder.headers(headers);
251
252 if !request.query.is_empty() {
253 builder = builder.query(&request.query);
254 }
255
256 if let Some(timeout) = request.request_timeout {
257 builder = builder.timeout(timeout);
258 }
259
260 builder = apply_request_body(builder, request.body);
261
262 let response = self
263 .send_with_write_timeout(
264 builder,
265 method.clone(),
266 url.clone(),
267 cancellation_token.as_ref(),
268 )
269 .await?;
270
271 if !response.status().is_success() {
272 let status = response.status();
273 let retry_after = parse_retry_after(status, response.headers());
274 let error = response.error_for_status_ref().expect_err(
275 "non-success HTTP status must produce reqwest status error via error_for_status_ref",
276 );
277 let body_preview = self.read_error_response_preview(response).await;
278 let message = format!(
279 "HTTP request failed with status {} for {} {}; response body preview: {}",
280 status, method, url, body_preview
281 );
282 let mut mapped = map_reqwest_error(
283 error,
284 HttpErrorKind::Status,
285 Some(method.clone()),
286 Some(url.clone()),
287 )
288 .with_status(status)
289 .with_response_body_preview(body_preview);
290 if let Some(retry_after) = retry_after {
291 mapped = mapped.with_retry_after(retry_after);
292 }
293 mapped.message = message;
294 return Err(mapped);
295 }
296
297 let status = response.status();
298 let response_url = response.url().clone();
299 let response_headers = response.headers().clone();
300
301 let body = self
302 .read_body_with_timeout(
303 response,
304 method.clone(),
305 response_url.clone(),
306 cancellation_token.as_ref(),
307 )
308 .await?;
309
310 logger.log_response(status, &response_url, &response_headers, &body);
311
312 Ok(HttpResponse::new(
313 status,
314 response_headers,
315 body,
316 response_url,
317 ))
318 }
319
320 pub async fn execute_stream(&self, request: HttpRequest) -> HttpResult<HttpStreamResponse> {
332 let retry_options = self.resolve_retry_options(&request);
333 let honor_retry_after = request.retry_override.should_honor_retry_after();
334 if !self.should_retry_request(&request, &retry_options) {
335 return self.execute_stream_once(request).await;
336 }
337 self.execute_stream_with_retry(request, retry_options, honor_retry_after)
338 .await
339 }
340
341 async fn execute_stream_once(&self, request: HttpRequest) -> HttpResult<HttpStreamResponse> {
352 let url = self.resolve_url(&request)?;
353 let method = request.method.clone();
354 let cancellation_token = request.cancellation_token.clone();
355 if let Some(error) = cancelled_request_error_if_needed(
356 cancellation_token.as_ref(),
357 &method,
358 &url,
359 "Streaming request cancelled before sending",
360 ) {
361 return Err(error);
362 }
363 let headers = self.build_headers(&request).await?;
364
365 let body_for_log = clone_request_body_for_log(&request.body);
366
367 let logger = HttpLogger::new(&self.options.logging, &self.options.sensitive_headers);
368 logger.log_request(&method, &url, &headers, body_for_log.as_ref());
369
370 let mut builder = self.client.request(method.clone(), url.clone());
371 builder = builder.headers(headers);
372
373 if !request.query.is_empty() {
374 builder = builder.query(&request.query);
375 }
376
377 if let Some(timeout) = request.request_timeout {
378 builder = builder.timeout(timeout);
379 }
380
381 builder = apply_request_body(builder, request.body);
382
383 let response = self
384 .send_with_write_timeout(
385 builder,
386 method.clone(),
387 url.clone(),
388 cancellation_token.as_ref(),
389 )
390 .await?;
391
392 if !response.status().is_success() {
393 let status = response.status();
394 let retry_after = parse_retry_after(status, response.headers());
395 let error = response.error_for_status_ref().expect_err(
396 "non-success HTTP status must produce reqwest status error via error_for_status_ref",
397 );
398 let body_preview = self.read_error_response_preview(response).await;
399 let message = format!(
400 "HTTP streaming request failed with status {} for {} {}; response body preview: {}",
401 status, method, url, body_preview
402 );
403 let mut mapped = map_reqwest_error(
404 error,
405 HttpErrorKind::Status,
406 Some(method.clone()),
407 Some(url.clone()),
408 )
409 .with_status(status)
410 .with_response_body_preview(body_preview);
411 if let Some(retry_after) = retry_after {
412 mapped = mapped.with_retry_after(retry_after);
413 }
414 mapped.message = message;
415 return Err(mapped);
416 }
417
418 let status = response.status();
419 let response_url = response.url().clone();
420 let response_headers = response.headers().clone();
421
422 logger.log_stream_response_headers(status, &response_url, &response_headers);
423
424 let read_timeout = self.options.timeouts.read_timeout;
425 let method_for_err = method.clone();
426 let url_for_err = response_url.clone();
427 let cancellation_token_for_stream = cancellation_token.clone();
428
429 let mut stream = response.bytes_stream();
430 let wrapped = stream! {
431 loop {
432 let next = if let Some(token) = &cancellation_token_for_stream {
433 tokio::select! {
434 _ = token.cancelled() => {
435 yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
436 .with_method(method_for_err.clone())
437 .with_url(url_for_err.clone()));
438 break;
439 }
440 item = tokio::time::timeout(read_timeout, stream.next()) => item,
441 }
442 } else {
443 tokio::time::timeout(read_timeout, stream.next()).await
444 };
445 match next {
446 Ok(Some(Ok(bytes))) => yield Ok(bytes),
447 Ok(Some(Err(error))) => {
448 let mapped = map_reqwest_error(
449 error,
450 HttpErrorKind::Transport,
451 Some(method_for_err.clone()),
452 Some(url_for_err.clone()),
453 );
454 yield Err(mapped);
455 break;
456 }
457 Ok(None) => break,
458 Err(_) => {
459 let error = HttpError::read_timeout(format!(
460 "Read timeout after {:?} while streaming response",
461 read_timeout
462 ))
463 .with_method(method_for_err.clone())
464 .with_url(url_for_err.clone());
465 yield Err(error);
466 break;
467 }
468 }
469 }
470 };
471
472 Ok(HttpStreamResponse::new_with_sse_options(
473 status,
474 response_headers,
475 response_url,
476 Box::pin(wrapped),
477 self.options.sse_json_mode,
478 self.options.sse_max_line_bytes,
479 self.options.sse_max_frame_bytes,
480 ))
481 }
482
483 fn should_retry_request(
493 &self,
494 request: &HttpRequest,
495 retry_options: &HttpRetryOptions,
496 ) -> bool {
497 retry_options.max_attempts > 1 && retry_options.allows_method(&request.method)
498 }
499
500 fn resolve_retry_options(&self, request: &HttpRequest) -> HttpRetryOptions {
508 let mut options = self.options.retry.clone();
509 options.enabled = request.retry_override.resolve_enabled(options.enabled);
510 options.method_policy = request
511 .retry_override
512 .resolve_method_policy(options.method_policy);
513 options
514 }
515
516 fn build_retry_executor(
527 &self,
528 retry_options: &HttpRetryOptions,
529 honor_retry_after: bool,
530 ) -> HttpResult<RetryExecutor<HttpError>> {
531 let options = RetryOptions::new(
532 retry_options.max_attempts,
533 retry_options.max_duration,
534 retry_options.delay_strategy.clone(),
535 Jitter::factor(retry_options.jitter_factor),
536 )
537 .map_err(|error| HttpError::other(format!("Invalid HTTP retry options: {error}")))?;
538
539 let mut builder = RetryExecutor::<HttpError>::builder()
540 .options(options)
541 .classify_error(|error: &HttpError, _| {
542 if matches!(error.retry_hint(), RetryHint::Retryable) {
543 RetryDecision::Retry
544 } else {
545 RetryDecision::Abort
546 }
547 });
548 if honor_retry_after {
549 builder = builder.on_retry(|context, failure| {
550 let AttemptFailure::Error(error) = failure else {
551 return;
552 };
553 let Some(retry_after) = error.retry_after else {
554 return;
555 };
556 if retry_after > context.next_delay {
557 std::thread::sleep(retry_after - context.next_delay);
558 }
559 });
560 }
561 builder
562 .build()
563 .map_err(|error| HttpError::other(format!("Invalid HTTP retry executor: {error}")))
564 }
565
566 async fn execute_with_retry(
578 &self,
579 request: HttpRequest,
580 retry_options: HttpRetryOptions,
581 honor_retry_after: bool,
582 ) -> HttpResult<HttpResponse> {
583 let policy = self.build_retry_executor(&retry_options, honor_retry_after)?;
584 let client = self.clone();
585 let result = policy
586 .run_async(move || {
587 let client = client.clone();
588 let request = request.clone();
589 async move { client.execute_once(request).await }
590 })
591 .await;
592 map_retry_result(result)
593 }
594
595 async fn execute_stream_with_retry(
608 &self,
609 request: HttpRequest,
610 retry_options: HttpRetryOptions,
611 honor_retry_after: bool,
612 ) -> HttpResult<HttpStreamResponse> {
613 let policy = self.build_retry_executor(&retry_options, honor_retry_after)?;
614 let client = self.clone();
615 let result = policy
616 .run_async(move || {
617 let client = client.clone();
618 let request = request.clone();
619 async move { client.execute_stream_once(request).await }
620 })
621 .await;
622 map_retry_result(result)
623 }
624
625 fn resolve_url(&self, request: &HttpRequest) -> HttpResult<Url> {
633 if let Ok(url) = Url::parse(&request.path) {
634 self.validate_resolved_url_host(&url)?;
635 return Ok(url);
636 }
637
638 let base = self.options.base_url.as_ref().ok_or_else(|| {
639 HttpError::invalid_url(format!(
640 "Cannot resolve relative path '{}' without base_url",
641 request.path
642 ))
643 })?;
644
645 let url = base.join(&request.path).map_err(|error| {
646 HttpError::invalid_url(format!(
647 "Failed to resolve path '{}' against base URL '{}': {}",
648 request.path, base, error
649 ))
650 })?;
651 self.validate_resolved_url_host(&url)?;
652 Ok(url)
653 }
654
655 fn validate_resolved_url_host(&self, url: &Url) -> HttpResult<()> {
666 if self.options.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
667 return Err(HttpError::invalid_url(format!(
668 "IPv6 literal host is not allowed when ipv4_only=true: {}",
669 url
670 )));
671 }
672 Ok(())
673 }
674
675 async fn build_headers(&self, request: &HttpRequest) -> HttpResult<HeaderMap> {
684 let mut headers = self.options.default_headers.clone();
685
686 for injector in &self.injectors {
687 injector.apply(&mut headers)?;
688 }
689 for injector in &self.async_injectors {
690 injector.apply(&mut headers).await?;
691 }
692
693 headers.extend(request.headers.clone());
694 Ok(headers)
695 }
696
697 async fn send_with_write_timeout(
710 &self,
711 builder: reqwest::RequestBuilder,
712 method: http::Method,
713 url: Url,
714 cancellation_token: Option<&CancellationToken>,
715 ) -> HttpResult<Response> {
716 let timeout = self.options.timeouts.write_timeout;
717 let send_future = tokio::time::timeout(timeout, builder.send());
718 let next = if let Some(token) = cancellation_token {
719 tokio::select! {
720 _ = token.cancelled() => {
721 return Err(HttpError::cancelled("Request cancelled while sending")
722 .with_method(method)
723 .with_url(url));
724 }
725 send_result = send_future => send_result,
726 }
727 } else {
728 send_future.await
729 };
730 match next {
731 Ok(Ok(response)) => Ok(response),
732 Ok(Err(error)) => Err(map_reqwest_error(
733 error,
734 HttpErrorKind::Transport,
735 Some(method),
736 Some(url),
737 )),
738 Err(_) => Err(HttpError::write_timeout(format!(
739 "Write timeout after {:?} while sending request",
740 timeout
741 ))
742 .with_method(method)
743 .with_url(url)),
744 }
745 }
746
747 async fn read_body_with_timeout(
757 &self,
758 response: Response,
759 method: http::Method,
760 url: Url,
761 cancellation_token: Option<&CancellationToken>,
762 ) -> HttpResult<Bytes> {
763 let timeout = self.options.timeouts.read_timeout;
764 let read_future = tokio::time::timeout(timeout, response.bytes());
765 let next = if let Some(token) = cancellation_token {
766 tokio::select! {
767 _ = token.cancelled() => {
768 return Err(HttpError::cancelled("Request cancelled while reading response body")
769 .with_method(method)
770 .with_url(url));
771 }
772 read_result = read_future => read_result,
773 }
774 } else {
775 read_future.await
776 };
777 match next {
778 Ok(Ok(body)) => Ok(body),
779 Ok(Err(error)) => Err(map_reqwest_error(
780 error,
781 HttpErrorKind::Decode,
782 Some(method),
783 Some(url),
784 )),
785 Err(_) => Err(HttpError::read_timeout(format!(
786 "Read timeout after {:?} while reading response body",
787 timeout
788 ))
789 .with_method(method)
790 .with_url(url)),
791 }
792 }
793
794 async fn read_error_response_preview(&self, mut response: Response) -> String {
802 let read_timeout = self.options.timeouts.read_timeout;
803 let max_bytes = self.options.logging.body_size_limit.max(1);
804 let mut preview = Vec::new();
805 let mut truncated = false;
806
807 loop {
808 let next = tokio::time::timeout(read_timeout, response.chunk()).await;
809 match next {
810 Ok(Ok(Some(chunk))) => {
811 if preview.len() >= max_bytes {
812 truncated = true;
813 break;
814 }
815 let remaining = max_bytes - preview.len();
816 if chunk.len() > remaining {
817 preview.extend_from_slice(&chunk[..remaining]);
818 truncated = true;
819 break;
820 }
821 preview.extend_from_slice(&chunk);
822 }
823 Ok(Ok(None)) => break,
824 Ok(Err(error)) => {
825 return format!(
826 "<error body unavailable: failed to read response body: {}>",
827 error
828 );
829 }
830 Err(_) => {
831 return format!(
832 "<error body unavailable: read timeout after {:?}>",
833 read_timeout
834 );
835 }
836 }
837 }
838
839 render_error_body_preview(&preview, truncated)
840 }
841}
842
843fn cancelled_request_error_if_needed(
854 token: Option<&CancellationToken>,
855 method: &http::Method,
856 url: &Url,
857 message: &str,
858) -> Option<HttpError> {
859 if token.is_some_and(CancellationToken::is_cancelled) {
860 Some(
861 HttpError::cancelled(message.to_string())
862 .with_method(method.clone())
863 .with_url(url.clone()),
864 )
865 } else {
866 None
867 }
868}
869
870fn clone_request_body_for_log(body: &HttpRequestBody) -> Option<Bytes> {
878 match body {
879 HttpRequestBody::Bytes(bytes)
880 | HttpRequestBody::Json(bytes)
881 | HttpRequestBody::Form(bytes)
882 | HttpRequestBody::Multipart(bytes)
883 | HttpRequestBody::Ndjson(bytes) => Some(bytes.clone()),
884 HttpRequestBody::Text(text) => Some(Bytes::from(text.clone())),
885 HttpRequestBody::Empty => None,
886 }
887}
888
889fn apply_request_body(
898 builder: reqwest::RequestBuilder,
899 body: HttpRequestBody,
900) -> reqwest::RequestBuilder {
901 match body {
902 HttpRequestBody::Empty => builder,
903 HttpRequestBody::Bytes(bytes)
904 | HttpRequestBody::Json(bytes)
905 | HttpRequestBody::Form(bytes)
906 | HttpRequestBody::Multipart(bytes)
907 | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
908 HttpRequestBody::Text(text) => builder.body(text),
909 }
910}
911
912fn map_retry_result<T>(result: RetryResult<T, HttpError>) -> HttpResult<T> {
925 match result {
926 Ok(value) => Ok(value),
927 Err(RetryError::Aborted { failure, .. }) => map_retry_failure(failure),
928 Err(RetryError::AttemptsExceeded {
929 attempts,
930 max_attempts,
931 last_failure,
932 ..
933 }) => {
934 let mut error = map_retry_failure_to_error(last_failure);
935 error.message = format!(
936 "{} (retry attempts exhausted: {attempts}/{max_attempts})",
937 error.message
938 );
939 Err(error)
940 }
941 Err(RetryError::MaxElapsedExceeded {
942 elapsed,
943 max_elapsed,
944 last_failure: Some(last_failure),
945 ..
946 }) => {
947 let mut error = map_retry_failure_to_error(last_failure);
948 error.message = format!(
949 "{} (retry max duration exceeded: {elapsed:?}/{max_elapsed:?})",
950 error.message
951 );
952 Err(error)
953 }
954 Err(RetryError::MaxElapsedExceeded {
955 elapsed,
956 max_elapsed,
957 last_failure: None,
958 ..
959 }) => Err(HttpError::other(format!(
960 "HTTP retry max duration exceeded before a retryable error was captured: {elapsed:?}/{max_elapsed:?}"
961 ))),
962 }
963}
964
965fn map_retry_failure<T>(failure: AttemptFailure<HttpError>) -> HttpResult<T> {
974 Err(map_retry_failure_to_error(failure))
975}
976
977fn map_retry_failure_to_error(failure: AttemptFailure<HttpError>) -> HttpError {
985 match failure {
986 AttemptFailure::Error(error) => error,
987 AttemptFailure::AttemptTimeout { elapsed, timeout } => HttpError::other(format!(
988 "HTTP retry attempt timeout after {elapsed:?} (timeout: {timeout:?})"
989 )),
990 }
991}
992
993fn map_reqwest_error(
1006 error: reqwest::Error,
1007 default_kind: HttpErrorKind,
1008 method: Option<http::Method>,
1009 url: Option<Url>,
1010) -> HttpError {
1011 let kind = if error.is_timeout() {
1012 classify_reqwest_timeout_kind(&error)
1013 } else if error.is_decode() {
1014 HttpErrorKind::Decode
1015 } else if error.is_status() {
1016 HttpErrorKind::Status
1017 } else if error.is_request() && error.url().is_none() {
1018 HttpErrorKind::InvalidUrl
1019 } else {
1020 default_kind
1021 };
1022
1023 let mut result = HttpError::new(kind, format!("HTTP transport error: {}", error));
1024 if let Some(method) = method {
1025 result = result.with_method(method);
1026 }
1027 if let Some(url) = url {
1028 result = result.with_url(url);
1029 }
1030 result.with_source(error)
1031}
1032
1033fn classify_reqwest_timeout_kind(error: &reqwest::Error) -> HttpErrorKind {
1042 let message = error.to_string().to_ascii_lowercase();
1043 if message.contains("connect") {
1044 HttpErrorKind::ConnectTimeout
1045 } else {
1046 HttpErrorKind::RequestTimeout
1047 }
1048}
1049
1050fn parse_retry_after(status: StatusCode, headers: &HeaderMap) -> Option<Duration> {
1060 if status != StatusCode::TOO_MANY_REQUESTS {
1061 return None;
1062 }
1063 headers
1064 .get(RETRY_AFTER)
1065 .and_then(|value| value.to_str().ok())
1066 .and_then(parse_retry_after_value)
1067}
1068
1069fn parse_retry_after_value(value: &str) -> Option<Duration> {
1077 let seconds = value.trim().parse::<u64>().ok()?;
1078 Some(Duration::from_secs(seconds))
1079}
1080
1081fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
1090 if bytes.is_empty() {
1091 return "<empty>".to_string();
1092 }
1093
1094 let suffix = if truncated { "...<truncated>" } else { "" };
1095 match std::str::from_utf8(bytes) {
1096 Ok(text) => format!("{text}{suffix}"),
1097 Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
1098 }
1099}