qubit_http/request/http_request.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Immutable HTTP request object.
11
12use std::future::Future;
13use std::sync::RwLock;
14use std::time::Duration;
15
16use bytes::Bytes;
17use futures_util::stream as futures_stream;
18use http::{
19 HeaderMap,
20 HeaderName,
21 HeaderValue,
22 Method,
23};
24use qubit_function::MutatingFunction;
25use reqwest::Response;
26use tokio_util::sync::CancellationToken;
27use url::Host;
28use url::Url;
29
30use crate::error::{
31 backend_error_mapper::map_reqwest_error,
32 ReqwestErrorPhase,
33};
34use crate::{
35 AsyncHttpHeaderInjector,
36 HttpError,
37 HttpErrorKind,
38 HttpHeaderInjector,
39 HttpLogger,
40 HttpRequestStreamingBody,
41 HttpResult,
42};
43
44use super::http_request_body::HttpRequestBody;
45use super::http_request_builder::HttpRequestBuilder;
46use super::http_request_retry_override::HttpRequestRetryOverride;
47use super::parse_header;
48
49/// Request execution options (timeouts, cancellation, and retry override).
50#[derive(Debug, Clone)]
51struct HttpRequestExecutionOptions {
52 /// Overrides client-wide request timeout when set; otherwise client default applies.
53 request_timeout: Option<Duration>,
54 /// Per-request write timeout used during request sending.
55 write_timeout: Duration,
56 /// Per-request read timeout used during response body reads.
57 read_timeout: Duration,
58 /// Optional cancellation token checked before send and during I/O phases.
59 cancellation_token: Option<CancellationToken>,
60 /// Per-request retry override (enable/disable/method-policy/Retry-After behavior).
61 retry_override: HttpRequestRetryOverride,
62}
63
64/// Request context captured from the originating client.
65#[derive(Debug, Clone)]
66struct HttpRequestContext {
67 /// Base URL copied from client options, used to resolve relative `path`.
68 base_url: Option<Url>,
69 /// Whether resolved URLs must avoid IPv6 literal hosts.
70 ipv4_only: bool,
71 /// Client default headers snapshot captured when this request builder was created.
72 default_headers: HeaderMap,
73 /// Client sync header injectors snapshot captured when this request builder was created.
74 injectors: Vec<HttpHeaderInjector>,
75 /// Client async header injectors snapshot captured when this request builder was created.
76 async_injectors: Vec<AsyncHttpHeaderInjector>,
77}
78
79/// Immutable snapshot of a single HTTP call produced by
80/// [`crate::HttpRequestBuilder`].
81#[derive(Debug)]
82pub struct HttpRequest {
83 /// HTTP method (GET, POST, …).
84 method: Method,
85 /// Absolute URL string, or path joined with client `base_url` when not
86 /// parseable as URL.
87 path: String,
88 /// Query string parameters as `(name, value)` pairs.
89 query: Vec<(String, String)>,
90 /// Headers added on top of client defaults and injector output.
91 headers: HeaderMap,
92 /// Serialized body variant.
93 body: HttpRequestBody,
94 /// Deferred per-attempt streaming body factory.
95 streaming_body: Option<HttpRequestStreamingBody>,
96 /// Lazily maintained cache for the currently resolved URL.
97 resolved_url: RwLock<Option<Url>>,
98 /// Attempt-scoped cache of merged outbound headers after applying
99 /// defaults/injectors/request-local headers.
100 effective_headers: Option<HeaderMap>,
101 /// Request execution options and runtime controls.
102 execution_options: HttpRequestExecutionOptions,
103 /// Client-derived context for URL and header resolution.
104 context: HttpRequestContext,
105}
106
107impl HttpRequest {
108 /// Consumes a finished [`HttpRequestBuilder`] and freezes its fields into
109 /// an [`HttpRequest`].
110 ///
111 /// # Parameters
112 /// - `builder`: Populated builder produced by the HTTP client pipeline.
113 ///
114 /// # Returns
115 /// Snapshot ready for URL resolution, header assembly, and sending.
116 pub(super) fn new(builder: HttpRequestBuilder) -> Self {
117 let mut request = Self {
118 method: builder.method,
119 path: builder.path,
120 query: builder.query,
121 headers: builder.headers,
122 body: builder.body,
123 streaming_body: builder.streaming_body,
124 resolved_url: RwLock::new(None),
125 effective_headers: None,
126 execution_options: HttpRequestExecutionOptions {
127 request_timeout: builder.request_timeout,
128 write_timeout: builder.write_timeout,
129 read_timeout: builder.read_timeout,
130 cancellation_token: builder.cancellation_token,
131 retry_override: builder.retry_override,
132 },
133 context: HttpRequestContext {
134 base_url: builder.base_url,
135 ipv4_only: builder.ipv4_only,
136 default_headers: builder.default_headers,
137 injectors: builder.injectors,
138 async_injectors: builder.async_injectors,
139 },
140 };
141 request.refresh_resolved_url_cache();
142 request
143 }
144
145 /// Returns the HTTP verb for this snapshot.
146 ///
147 /// # Returns
148 /// Borrowed [`Method`] (for example GET or POST).
149 pub fn method(&self) -> &Method {
150 &self.method
151 }
152
153 /// Replaces the HTTP verb.
154 ///
155 /// # Parameters
156 /// - `method`: New [`Method`].
157 ///
158 /// # Returns
159 /// `self` for method chaining.
160 pub fn set_method(&mut self, method: Method) -> &mut Self {
161 self.method = method;
162 self
163 }
164
165 /// Returns the path segment or absolute URL string stored on this request.
166 ///
167 /// # Returns
168 /// The raw path/URL before query string assembly; may be relative if a base
169 /// URL is set.
170 pub fn path(&self) -> &str {
171 &self.path
172 }
173
174 /// Replaces the path or absolute URL string.
175 ///
176 /// # Parameters
177 /// - `path`: New path or URL string (query string is managed separately via
178 /// [`Self::add_query_param`]).
179 ///
180 /// # Returns
181 /// `self` for method chaining.
182 pub fn set_path(&mut self, path: &str) -> &mut Self {
183 self.path = path.to_string();
184 self.refresh_resolved_url_cache();
185 self
186 }
187
188 /// Returns ordered `(name, value)` query pairs that will be appended to the
189 /// resolved URL.
190 ///
191 /// # Returns
192 /// Slice view of accumulated query parameters.
193 pub fn query(&self) -> &[(String, String)] {
194 &self.query
195 }
196
197 /// Appends a single query pair preserving insertion order.
198 ///
199 /// # Parameters
200 /// - `key`: Parameter name.
201 /// - `value`: Parameter value.
202 ///
203 /// # Returns
204 /// `self` for method chaining.
205 pub fn add_query_param(&mut self, key: &str, value: &str) -> &mut Self {
206 self.query.push((key.to_string(), value.to_string()));
207 self
208 }
209
210 /// Removes every query pair from this snapshot.
211 ///
212 /// # Returns
213 /// `self` for method chaining.
214 pub fn clear_query_params(&mut self) -> &mut Self {
215 self.query.clear();
216 self
217 }
218
219 /// Returns request-local headers layered on top of client defaults and
220 /// injector output at send time.
221 ///
222 /// # Returns
223 /// Borrowed [`HeaderMap`] owned by this request only (not merged defaults).
224 pub fn headers(&self) -> &HeaderMap {
225 &self.headers
226 }
227
228 /// Parses and inserts one header from string name/value pairs.
229 ///
230 /// # Parameters
231 /// - `name`: Header field name.
232 /// - `value`: Header field value.
233 ///
234 /// # Returns
235 /// `Ok(self)` on success.
236 ///
237 /// # Errors
238 /// Returns [`HttpError`] when name or value cannot be converted into valid
239 /// HTTP tokens.
240 pub fn set_header(&mut self, name: &str, value: &str) -> Result<&mut Self, HttpError> {
241 let (header_name, header_value) = parse_header(name, value)?;
242 self.headers.insert(header_name, header_value);
243 self.invalidate_effective_headers_cache();
244 Ok(self)
245 }
246
247 /// Inserts one header using pre-validated [`HeaderName`] / [`HeaderValue`]
248 /// types.
249 ///
250 /// # Parameters
251 /// - `name`: Typed header name.
252 /// - `value`: Typed header value.
253 ///
254 /// # Returns
255 /// `self` for method chaining.
256 pub fn set_typed_header(&mut self, name: HeaderName, value: HeaderValue) -> &mut Self {
257 self.headers.insert(name, value);
258 self.invalidate_effective_headers_cache();
259 self
260 }
261
262 /// Removes all values for a header field by typed name.
263 ///
264 /// # Parameters
265 /// - `name`: Header name to strip from the request-local map.
266 ///
267 /// # Returns
268 /// `self` for method chaining.
269 pub fn remove_header(&mut self, name: &HeaderName) -> &mut Self {
270 self.headers.remove(name);
271 self.invalidate_effective_headers_cache();
272 self
273 }
274
275 /// Clears all request-local headers (defaults and injectors are unaffected
276 /// until send).
277 ///
278 /// # Returns
279 /// `self` for method chaining.
280 pub fn clear_headers(&mut self) -> &mut Self {
281 self.headers.clear();
282 self.invalidate_effective_headers_cache();
283 self
284 }
285
286 /// Returns the serialized body variant for this snapshot.
287 ///
288 /// # Returns
289 /// Borrowed [`HttpRequestBody`].
290 pub fn body(&self) -> &HttpRequestBody {
291 &self.body
292 }
293
294 /// Replaces the entire body payload.
295 ///
296 /// # Parameters
297 /// - `body`: New [`HttpRequestBody`] variant.
298 ///
299 /// # Returns
300 /// `self` for method chaining.
301 pub fn set_body(&mut self, body: HttpRequestBody) -> &mut Self {
302 self.body = body;
303 self.streaming_body = None;
304 self
305 }
306
307 /// Sets deferred streaming upload body factory for this request.
308 ///
309 /// # Parameters
310 /// - `streaming_body`: Deferred body stream factory reused across retries.
311 ///
312 /// # Returns
313 /// `self` for method chaining.
314 pub fn set_streaming_body(&mut self, streaming_body: HttpRequestStreamingBody) -> &mut Self {
315 self.streaming_body = Some(streaming_body);
316 self.body = HttpRequestBody::Empty;
317 self
318 }
319
320 /// Returns the per-request total timeout, if any.
321 ///
322 /// # Returns
323 /// `Some(duration)` when a request-specific timeout overrides the client
324 /// default; otherwise `None`.
325 pub fn request_timeout(&self) -> Option<Duration> {
326 self.execution_options.request_timeout
327 }
328
329 /// Sets a per-request total timeout that overrides the client default for
330 /// this send.
331 ///
332 /// # Parameters
333 /// - `timeout`: Upper bound for the entire request lifecycle handled by
334 /// reqwest.
335 ///
336 /// # Returns
337 /// `self` for method chaining.
338 pub fn set_request_timeout(&mut self, timeout: Duration) -> &mut Self {
339 self.execution_options.request_timeout = Some(timeout);
340 self
341 }
342
343 /// Drops the per-request timeout so the client-wide default applies again.
344 ///
345 /// # Returns
346 /// `self` for method chaining.
347 pub fn clear_request_timeout(&mut self) -> &mut Self {
348 self.execution_options.request_timeout = None;
349 self
350 }
351
352 /// Returns the write-phase timeout used while sending the request.
353 pub fn write_timeout(&self) -> Duration {
354 self.execution_options.write_timeout
355 }
356
357 /// Sets the write-phase timeout used while sending the request.
358 pub fn set_write_timeout(&mut self, timeout: Duration) -> &mut Self {
359 self.execution_options.write_timeout = timeout;
360 self
361 }
362
363 /// Returns the read-phase timeout used while reading response body bytes.
364 pub fn read_timeout(&self) -> Duration {
365 self.execution_options.read_timeout
366 }
367
368 /// Sets the read-phase timeout used while reading response body bytes.
369 pub fn set_read_timeout(&mut self, timeout: Duration) -> &mut Self {
370 self.execution_options.read_timeout = timeout;
371 self
372 }
373
374 /// Returns the optional base URL used to resolve relative [`Self::path`]
375 /// values.
376 ///
377 /// # Returns
378 /// `Some` when a base is configured; `None` when only absolute URLs in
379 /// `path` are valid.
380 pub fn base_url(&self) -> Option<&Url> {
381 self.context.base_url.as_ref()
382 }
383
384 /// Sets the base URL used for internal URL resolution when `path` is not
385 /// absolute.
386 ///
387 /// # Parameters
388 /// - `base_url`: Root URL to join against relative paths.
389 ///
390 /// # Returns
391 /// `self` for method chaining.
392 pub fn set_base_url(&mut self, base_url: Url) -> &mut Self {
393 self.context.base_url = Some(base_url);
394 self.refresh_resolved_url_cache();
395 self
396 }
397
398 /// Removes the configured base URL so relative paths can no longer be
399 /// resolved without resetting it.
400 ///
401 /// # Returns
402 /// `self` for method chaining.
403 pub fn clear_base_url(&mut self) -> &mut Self {
404 self.context.base_url = None;
405 self.refresh_resolved_url_cache();
406 self
407 }
408
409 /// Returns whether IPv6 literal hosts are rejected after URL resolution.
410 ///
411 /// # Returns
412 /// `true` when a resolved URL whose host is an IPv6 literal must be
413 /// rejected with [`HttpError::invalid_url`].
414 pub fn ipv4_only(&self) -> bool {
415 self.context.ipv4_only
416 }
417
418 /// Enables or disables IPv6 literal host rejection for resolved URLs.
419 ///
420 /// # Parameters
421 /// - `enabled`: When `true`, resolved URLs whose host is an IPv6 literal
422 /// are errors.
423 ///
424 /// # Returns
425 /// `self` for method chaining.
426 pub fn set_ipv4_only(&mut self, enabled: bool) -> &mut Self {
427 self.context.ipv4_only = enabled;
428 self.refresh_resolved_url_cache();
429 self
430 }
431
432 /// Returns the cooperative cancellation handle, if configured.
433 ///
434 /// # Returns
435 /// `Some` token checked before send and during I/O; `None` when
436 /// cancellation is not wired.
437 pub fn cancellation_token(&self) -> Option<&CancellationToken> {
438 self.execution_options.cancellation_token.as_ref()
439 }
440
441 /// Attaches a [`CancellationToken`] that can abort this request
442 /// cooperatively.
443 ///
444 /// # Parameters
445 /// - `token`: Shared cancellation source.
446 ///
447 /// # Returns
448 /// `self` for method chaining.
449 pub fn set_cancellation_token(&mut self, token: CancellationToken) -> &mut Self {
450 self.execution_options.cancellation_token = Some(token);
451 self
452 }
453
454 /// Removes any cancellation token from this snapshot.
455 ///
456 /// # Returns
457 /// `self` for method chaining.
458 pub fn clear_cancellation_token(&mut self) -> &mut Self {
459 self.execution_options.cancellation_token = None;
460 self
461 }
462
463 /// Returns the per-request retry override applied by the client pipeline.
464 ///
465 /// # Returns
466 /// Borrowed [`HttpRequestRetryOverride`].
467 pub fn retry_override(&self) -> &HttpRequestRetryOverride {
468 &self.execution_options.retry_override
469 }
470
471 /// Replaces the retry override for this single request.
472 ///
473 /// # Parameters
474 /// - `retry_override`: New override policy and knobs.
475 ///
476 /// # Returns
477 /// `self` for method chaining.
478 pub fn set_retry_override(&mut self, retry_override: HttpRequestRetryOverride) -> &mut Self {
479 self.execution_options.retry_override = retry_override;
480 self
481 }
482
483 /// Moves the current body out, leaving [`HttpRequestBody::Empty`] in its
484 /// place.
485 ///
486 /// Used internally before handing the payload to reqwest so the snapshot is
487 /// not cloned twice.
488 ///
489 /// # Returns
490 /// Previous [`HttpRequestBody`] value.
491 pub(crate) fn take_body(&mut self) -> HttpRequestBody {
492 std::mem::replace(&mut self.body, HttpRequestBody::Empty)
493 }
494
495 /// Assembles a reqwest [`RequestBuilder`](reqwest::RequestBuilder), applies
496 /// this snapshot's body, then sends with a bounded write phase.
497 ///
498 /// Centralizes send-attempt preparation and transport wiring:
499 /// - invalidates and recomputes effective headers for each attempt;
500 /// - emits request TRACE logs via the provided logger;
501 /// - applies query/timeout/body wiring plus cooperative cancellation and
502 /// write-timeout handling.
503 ///
504 /// # Parameters
505 /// - `backend`: Shared reqwest client.
506 /// - `logger`: Attempt-scoped request logger.
507 ///
508 /// # Returns
509 /// The successful [`Response`] or a mapped [`HttpError`].
510 ///
511 /// # Errors
512 /// - Cooperative cancellation while waiting on the send future.
513 /// - Transport failures mapped from reqwest.
514 /// - Write timeout when the send future does not complete within
515 /// `write_timeout`.
516 pub(crate) async fn send_impl(
517 &mut self,
518 backend: &reqwest::Client,
519 logger: &HttpLogger<'_>,
520 ) -> HttpResult<Response> {
521 // Effective headers are cached on the request. Each send attempt must
522 // invalidate and recompute them so injector output and request mutations
523 // are refreshed instead of reusing stale headers from prior attempts.
524 self.invalidate_effective_headers_cache();
525 let method = self.method().clone();
526 let request_url_context = self.resolved_url_with_query().ok();
527 let write_timeout = self.execution_options.write_timeout;
528 let cancellation_token = self.execution_options.cancellation_token.clone();
529 let headers = Self::await_pre_send_future(
530 self.effective_headers(),
531 write_timeout,
532 cancellation_token,
533 &method,
534 request_url_context.as_ref(),
535 "Request cancelled while preparing request",
536 format!(
537 "Write timeout after {:?} while preparing request",
538 write_timeout
539 ),
540 )
541 .await?
542 .clone();
543 let url = self.resolved_url()?;
544 let request_url = self.resolved_url_with_query()?;
545 // Log the request after computing effective headers so TRACE logs
546 // include the same query string used by the actual send path.
547 logger.log_request(self);
548 let mut builder = backend.request(method.clone(), url.clone());
549 builder = builder.headers(headers);
550 if !self.query.is_empty() {
551 builder = builder.query(self.query.as_slice());
552 }
553 if let Some(timeout) = self.execution_options.request_timeout {
554 builder = builder.timeout(timeout);
555 }
556 if let Some(streaming_body) = self.streaming_body.as_ref() {
557 let body = Self::await_pre_send_future(
558 async { Ok(streaming_body.to_reqwest_body().await) },
559 self.execution_options.write_timeout,
560 self.execution_options.cancellation_token.clone(),
561 &method,
562 Some(&request_url),
563 "Request cancelled while preparing streaming request body",
564 format!(
565 "Write timeout after {:?} while preparing streaming request body",
566 self.execution_options.write_timeout
567 ),
568 )
569 .await?;
570 builder = builder.body(body);
571 } else {
572 builder = Self::apply_request_body(builder, self.take_body());
573 }
574
575 let send_future =
576 tokio::time::timeout(self.execution_options.write_timeout, builder.send());
577 let next = if let Some(token) = self.execution_options.cancellation_token.as_ref() {
578 tokio::select! {
579 _ = token.cancelled() => {
580 return Err(HttpError::cancelled("Request cancelled while sending")
581 .with_method(&method)
582 .with_url(&request_url));
583 }
584 send_result = send_future => send_result,
585 }
586 } else {
587 send_future.await
588 };
589
590 match next {
591 Ok(Ok(response)) => Ok(response),
592 Ok(Err(error)) => Err(map_reqwest_error(
593 error,
594 HttpErrorKind::Transport,
595 ReqwestErrorPhase::Send,
596 Some(method.clone()),
597 Some(request_url.clone()),
598 )),
599 Err(_) => Err(HttpError::write_timeout(format!(
600 "Write timeout after {:?} while sending request",
601 self.execution_options.write_timeout
602 ))
603 .with_method(&method)
604 .with_url(&request_url)),
605 }
606 }
607
608 /// Waits for one asynchronous pre-send preparation step with cancellation and
609 /// write-timeout handling.
610 ///
611 /// # Parameters
612 /// - `future`: Preparation future, such as async header injection or streaming
613 /// body factory execution.
614 /// - `write_timeout`: Timeout budget reused for send preparation.
615 /// - `cancellation_token`: Optional request cancellation token.
616 /// - `method`: Request method for error context.
617 /// - `request_url`: Optional resolved request URL for error context.
618 /// - `cancellation_message`: Message used when cancellation wins.
619 /// - `timeout_message`: Message used when timeout wins.
620 ///
621 /// # Returns
622 /// The future output when it completes before cancellation or timeout.
623 ///
624 /// # Errors
625 /// Returns [`HttpErrorKind::Cancelled`] on cancellation,
626 /// [`HttpErrorKind::WriteTimeout`] on timeout, or propagates the future's own
627 /// error.
628 async fn await_pre_send_future<T, F>(
629 future: F,
630 write_timeout: Duration,
631 cancellation_token: Option<CancellationToken>,
632 method: &Method,
633 request_url: Option<&Url>,
634 cancellation_message: &str,
635 timeout_message: String,
636 ) -> HttpResult<T>
637 where
638 F: Future<Output = HttpResult<T>>,
639 {
640 let timed = tokio::time::timeout(write_timeout, future);
641 let next = if let Some(token) = cancellation_token.as_ref() {
642 tokio::select! {
643 _ = token.cancelled() => {
644 return Err(Self::pre_send_cancelled_error(
645 cancellation_message,
646 method,
647 request_url,
648 ));
649 }
650 result = timed => result,
651 }
652 } else {
653 timed.await
654 };
655
656 match next {
657 Ok(result) => result,
658 Err(_) => Err(Self::pre_send_write_timeout_error(
659 timeout_message,
660 method,
661 request_url,
662 )),
663 }
664 }
665
666 /// Builds a cancellation error for pre-send preparation.
667 ///
668 /// # Parameters
669 /// - `message`: Cancellation message.
670 /// - `method`: Request method for context.
671 /// - `request_url`: Optional request URL for context.
672 ///
673 /// # Returns
674 /// Cancellation [`HttpError`] with request context attached.
675 fn pre_send_cancelled_error(
676 message: &str,
677 method: &Method,
678 request_url: Option<&Url>,
679 ) -> HttpError {
680 let mut error = HttpError::cancelled(message).with_method(method);
681 if let Some(request_url) = request_url {
682 error = error.with_url(request_url);
683 }
684 error
685 }
686
687 /// Builds a write-timeout error for pre-send preparation.
688 ///
689 /// # Parameters
690 /// - `message`: Timeout message.
691 /// - `method`: Request method for context.
692 /// - `request_url`: Optional request URL for context.
693 ///
694 /// # Returns
695 /// Write-timeout [`HttpError`] with request context attached.
696 fn pre_send_write_timeout_error(
697 message: String,
698 method: &Method,
699 request_url: Option<&Url>,
700 ) -> HttpError {
701 let mut error = HttpError::write_timeout(message).with_method(method);
702 if let Some(request_url) = request_url {
703 error = error.with_url(request_url);
704 }
705 error
706 }
707
708 /// Returns the resolved URL for current request fields, computing and
709 /// caching it on demand.
710 ///
711 /// # Returns
712 /// Resolved [`Url`] value (cloned from cache when already computed).
713 ///
714 /// # Errors
715 /// Returns [`HttpError::invalid_url`] when parsing fails, the base URL is
716 /// missing for a relative path, joining fails, or [`Self::ipv4_only`]
717 /// rejects an IPv6 literal host.
718 pub(crate) fn resolved_url(&self) -> Result<Url, HttpError> {
719 let cached = match self.resolved_url.read() {
720 Ok(guard) => guard.clone(),
721 Err(_) => return Err(HttpError::other("Resolved URL cache read lock poisoned")),
722 };
723 if let Some(url) = cached.as_ref() {
724 return Ok(url.clone());
725 }
726 let resolved = self.compute_resolved_url()?;
727 match self.resolved_url.write() {
728 Ok(mut guard) => *guard = Some(resolved.clone()),
729 Err(_) => return Err(HttpError::other("Resolved URL cache write lock poisoned")),
730 }
731 Ok(resolved)
732 }
733
734 /// Returns the resolved URL plus request-builder query parameters.
735 ///
736 /// This mirrors the URL sent by [`Self::send_impl`]: query pairs already
737 /// present in [`Self::path`] are preserved, and pairs from
738 /// [`Self::query`] are appended in insertion order.
739 ///
740 /// # Returns
741 /// Resolved [`Url`] including query parameters from this request snapshot.
742 ///
743 /// # Errors
744 /// Propagates [`Self::resolved_url`] errors for invalid or unresolved URLs.
745 pub(crate) fn resolved_url_with_query(&self) -> Result<Url, HttpError> {
746 let mut url = self.resolved_url()?;
747 if !self.query.is_empty() {
748 {
749 let mut pairs = url.query_pairs_mut();
750 for (key, value) in &self.query {
751 pairs.append_pair(key, value);
752 }
753 }
754 }
755 Ok(url)
756 }
757
758 /// Returns cached resolved URL when available.
759 pub fn resolved_url_cached(&self) -> Option<Url> {
760 self.resolved_url
761 .read()
762 .map(|guard| guard.clone())
763 .unwrap_or_default()
764 }
765
766 /// Recomputes and stores the current resolved URL.
767 fn refresh_resolved_url_cache(&mut self) {
768 if let Ok(mut guard) = self.resolved_url.write() {
769 *guard = self.compute_resolved_url().ok();
770 }
771 }
772
773 /// Computes the resolved URL from current path/base/ipv4 settings.
774 fn compute_resolved_url(&self) -> Result<Url, HttpError> {
775 if let Ok(url) = Url::parse(&self.path) {
776 self.validate_resolved_url_host(&url)?;
777 return Ok(url);
778 }
779
780 let base = self.context.base_url.as_ref().ok_or_else(|| {
781 HttpError::invalid_url(format!(
782 "Cannot resolve relative path '{}' without base_url",
783 self.path
784 ))
785 })?;
786
787 let url = base.join(&self.path).map_err(|error| {
788 HttpError::invalid_url(format!(
789 "Failed to resolve path '{}' against base URL '{}': {}",
790 self.path, base, error
791 ))
792 })?;
793 self.validate_resolved_url_host(&url)?;
794 Ok(url)
795 }
796
797 /// Enforces [`Self::ipv4_only`] by rejecting IPv6 literal hosts in `url`.
798 ///
799 /// # Parameters
800 /// - `url`: Candidate URL after parsing or joining.
801 ///
802 /// # Returns
803 /// `Ok(())` when the host is acceptable.
804 ///
805 /// # Errors
806 /// [`HttpError::invalid_url`] when `ipv4_only` is `true` and the host is an
807 /// IPv6 literal.
808 fn validate_resolved_url_host(&self, url: &Url) -> Result<(), HttpError> {
809 if self.context.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
810 return Err(HttpError::invalid_url(format!(
811 "IPv6 literal host is not allowed when ipv4_only=true: {}",
812 url
813 )));
814 }
815 Ok(())
816 }
817
818 /// Returns the attempt-scoped merged outbound headers.
819 ///
820 /// On first call after invalidation, this computes merged headers by
821 /// replaying defaults/injectors/request-local headers and stores them in
822 /// [`Self::effective_headers`]. Later calls in the same attempt return the
823 /// cached map.
824 ///
825 /// Why this API is async:
826 /// - async injectors are part of header assembly and may perform awaitable
827 /// work (for example token refresh or other I/O-backed value resolution).
828 /// - therefore header materialization cannot be fully synchronous.
829 ///
830 /// Merge order (later wins on duplicates):
831 /// 1. Client default headers snapshot captured when the builder was
832 /// created.
833 /// 2. Synchronous injector output in registration order.
834 /// 3. Asynchronous injector output in registration order.
835 /// 4. Request-local headers from this snapshot.
836 ///
837 /// # Returns
838 /// Borrowed merged [`HeaderMap`] from the cache.
839 ///
840 /// # Errors
841 /// Propagates failures returned by any injector's `apply` implementation.
842 pub(crate) async fn effective_headers(&mut self) -> HttpResult<&HeaderMap> {
843 if self.effective_headers.is_none() {
844 self.effective_headers = Some(self.compute_effective_headers().await?);
845 }
846 Ok(self
847 .effective_headers
848 .as_ref()
849 .expect("effective headers cache must be populated after computation"))
850 }
851
852 /// Returns cached merged outbound headers when available.
853 pub(crate) fn effective_headers_cached(&self) -> Option<&HeaderMap> {
854 self.effective_headers.as_ref()
855 }
856
857 /// Clears the effective-header cache.
858 ///
859 /// This method invalidates [`Self::effective_headers`] so the next call to
860 /// [`Self::effective_headers`] recomputes merged headers by re-running
861 /// defaults and header injectors.
862 ///
863 /// Why this is needed:
864 /// - request-local headers may have been mutated (`set_header`, `clear_headers`, etc.);
865 /// - injector output may be time-sensitive (for example rotating auth token
866 /// or timestamp-based signatures), so each send attempt should recompute
867 /// merged headers instead of reusing stale values from prior attempts.
868 ///
869 /// When to call:
870 /// - immediately before starting a new send attempt;
871 /// - after any mutation that can change final outbound headers.
872 pub(crate) fn invalidate_effective_headers_cache(&mut self) {
873 self.effective_headers = None;
874 }
875
876 /// Computes merged outbound headers without touching the cache.
877 async fn compute_effective_headers(&self) -> HttpResult<HeaderMap> {
878 let mut headers = self.context.default_headers.clone();
879
880 for injector in &self.context.injectors {
881 injector.apply(&mut headers)?;
882 }
883 for injector in &self.context.async_injectors {
884 injector.apply(&mut headers).await?;
885 }
886
887 headers.extend(self.headers.clone());
888 Ok(headers)
889 }
890
891 /// Returns a pre-cancelled [`HttpError`] when a token is present and
892 /// already cancelled.
893 ///
894 /// # Parameters
895 /// - `message`: Human-readable cancellation reason.
896 ///
897 /// # Returns
898 /// `Some` [`HttpError`] (including method context and cached URL when
899 /// available) when a token exists and is already cancelled; otherwise
900 /// `None`.
901 pub(crate) fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
902 if self
903 .execution_options
904 .cancellation_token
905 .as_ref()
906 .is_some_and(CancellationToken::is_cancelled)
907 {
908 let mut error = HttpError::cancelled(message.to_string()).with_method(&self.method);
909 if let Ok(url) = self.resolved_url_with_query() {
910 error = error.with_url(&url);
911 }
912 Some(error)
913 } else {
914 None
915 }
916 }
917
918 /// Attaches the correct reqwest body encoding for each [`HttpRequestBody`]
919 /// variant.
920 ///
921 /// # Parameters
922 /// - `builder`: Partially configured [`reqwest::RequestBuilder`]
923 /// (method/URL/headers already set).
924 /// - `body`: Payload variant to attach; moved into the builder.
925 ///
926 /// # Returns
927 /// The same builder with an appropriate `.body(...)` applied (or unchanged
928 /// for [`HttpRequestBody::Empty`]).
929 fn apply_request_body(
930 builder: reqwest::RequestBuilder,
931 body: HttpRequestBody,
932 ) -> reqwest::RequestBuilder {
933 match body {
934 HttpRequestBody::Empty => builder,
935 HttpRequestBody::Bytes(bytes)
936 | HttpRequestBody::Json(bytes)
937 | HttpRequestBody::Form(bytes)
938 | HttpRequestBody::Multipart(bytes)
939 | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
940 HttpRequestBody::Stream(chunks) => {
941 let body_stream = futures_stream::iter(
942 chunks.into_iter().map(Result::<Bytes, std::io::Error>::Ok),
943 );
944 builder.body(reqwest::Body::wrap_stream(body_stream))
945 }
946 HttpRequestBody::Text(text) => builder.body(text),
947 }
948 }
949}
950
951impl Clone for HttpRequest {
952 fn clone(&self) -> Self {
953 Self {
954 method: self.method.clone(),
955 path: self.path.clone(),
956 query: self.query.clone(),
957 headers: self.headers.clone(),
958 body: self.body.clone(),
959 streaming_body: self.streaming_body.clone(),
960 resolved_url: RwLock::new(self.resolved_url_cached()),
961 effective_headers: self.effective_headers.clone(),
962 execution_options: self.execution_options.clone(),
963 context: self.context.clone(),
964 }
965 }
966}