Skip to main content

qubit_http/request/
http_request.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Immutable HTTP request object.
11
12use std::future::Future;
13use std::sync::RwLock;
14use std::time::Duration;
15
16use bytes::Bytes;
17use futures_util::stream as futures_stream;
18use http::{HeaderMap, HeaderName, HeaderValue, Method};
19use qubit_function::MutatingFunction;
20use reqwest::Response;
21use tokio_util::sync::CancellationToken;
22use url::Host;
23use url::Url;
24
25use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
26use crate::{
27    AsyncHttpHeaderInjector, HttpError, HttpErrorKind, HttpHeaderInjector, HttpLogger,
28    HttpRequestStreamingBody, HttpResult,
29};
30
31use super::http_request_body::HttpRequestBody;
32use super::http_request_builder::HttpRequestBuilder;
33use super::http_request_retry_override::HttpRequestRetryOverride;
34use super::parse_header;
35
36/// Request execution options (timeouts, cancellation, and retry override).
37#[derive(Debug, Clone)]
38struct HttpRequestExecutionOptions {
39    /// Overrides client-wide request timeout when set; otherwise client default applies.
40    request_timeout: Option<Duration>,
41    /// Per-request write timeout used during request sending.
42    write_timeout: Duration,
43    /// Per-request read timeout used during response body reads.
44    read_timeout: Duration,
45    /// Optional cancellation token checked before send and during I/O phases.
46    cancellation_token: Option<CancellationToken>,
47    /// Per-request retry override (enable/disable/method-policy/Retry-After behavior).
48    retry_override: HttpRequestRetryOverride,
49}
50
51/// Request context captured from the originating client.
52#[derive(Debug, Clone)]
53struct HttpRequestContext {
54    /// Base URL copied from client options, used to resolve relative `path`.
55    base_url: Option<Url>,
56    /// Whether resolved URLs must avoid IPv6 literal hosts.
57    ipv4_only: bool,
58    /// Client default headers snapshot captured when this request builder was created.
59    default_headers: HeaderMap,
60    /// Client sync header injectors snapshot captured when this request builder was created.
61    injectors: Vec<HttpHeaderInjector>,
62    /// Client async header injectors snapshot captured when this request builder was created.
63    async_injectors: Vec<AsyncHttpHeaderInjector>,
64}
65
66/// Immutable snapshot of a single HTTP call produced by
67/// [`crate::HttpRequestBuilder`].
68#[derive(Debug)]
69pub struct HttpRequest {
70    /// HTTP method (GET, POST, …).
71    method: Method,
72    /// Absolute URL string, or path joined with client `base_url` when not
73    /// parseable as URL.
74    path: String,
75    /// Query string parameters as `(name, value)` pairs.
76    query: Vec<(String, String)>,
77    /// Headers added on top of client defaults and injector output.
78    headers: HeaderMap,
79    /// Serialized body variant.
80    body: HttpRequestBody,
81    /// Deferred per-attempt streaming body factory.
82    streaming_body: Option<HttpRequestStreamingBody>,
83    /// Lazily maintained cache for the currently resolved URL.
84    resolved_url: RwLock<Option<Url>>,
85    /// Attempt-scoped cache of merged outbound headers after applying
86    /// defaults/injectors/request-local headers.
87    effective_headers: Option<HeaderMap>,
88    /// Request execution options and runtime controls.
89    execution_options: HttpRequestExecutionOptions,
90    /// Client-derived context for URL and header resolution.
91    context: HttpRequestContext,
92}
93
94impl HttpRequest {
95    /// Consumes a finished [`HttpRequestBuilder`] and freezes its fields into
96    /// an [`HttpRequest`].
97    ///
98    /// # Parameters
99    /// - `builder`: Populated builder produced by the HTTP client pipeline.
100    ///
101    /// # Returns
102    /// Snapshot ready for URL resolution, header assembly, and sending.
103    pub(super) fn new(builder: HttpRequestBuilder) -> Self {
104        let mut request = Self {
105            method: builder.method,
106            path: builder.path,
107            query: builder.query,
108            headers: builder.headers,
109            body: builder.body,
110            streaming_body: builder.streaming_body,
111            resolved_url: RwLock::new(None),
112            effective_headers: None,
113            execution_options: HttpRequestExecutionOptions {
114                request_timeout: builder.request_timeout,
115                write_timeout: builder.write_timeout,
116                read_timeout: builder.read_timeout,
117                cancellation_token: builder.cancellation_token,
118                retry_override: builder.retry_override,
119            },
120            context: HttpRequestContext {
121                base_url: builder.base_url,
122                ipv4_only: builder.ipv4_only,
123                default_headers: builder.default_headers,
124                injectors: builder.injectors,
125                async_injectors: builder.async_injectors,
126            },
127        };
128        request.refresh_resolved_url_cache();
129        request
130    }
131
132    /// Returns the HTTP verb for this snapshot.
133    ///
134    /// # Returns
135    /// Borrowed [`Method`] (for example GET or POST).
136    pub fn method(&self) -> &Method {
137        &self.method
138    }
139
140    /// Replaces the HTTP verb.
141    ///
142    /// # Parameters
143    /// - `method`: New [`Method`].
144    ///
145    /// # Returns
146    /// `self` for method chaining.
147    pub fn set_method(&mut self, method: Method) -> &mut Self {
148        self.method = method;
149        self
150    }
151
152    /// Returns the path segment or absolute URL string stored on this request.
153    ///
154    /// # Returns
155    /// The raw path/URL before query string assembly; may be relative if a base
156    /// URL is set.
157    pub fn path(&self) -> &str {
158        &self.path
159    }
160
161    /// Replaces the path or absolute URL string.
162    ///
163    /// # Parameters
164    /// - `path`: New path or URL string (query string is managed separately via
165    ///   [`Self::add_query_param`]).
166    ///
167    /// # Returns
168    /// `self` for method chaining.
169    pub fn set_path(&mut self, path: &str) -> &mut Self {
170        self.path = path.to_string();
171        self.refresh_resolved_url_cache();
172        self
173    }
174
175    /// Returns ordered `(name, value)` query pairs that will be appended to the
176    /// resolved URL.
177    ///
178    /// # Returns
179    /// Slice view of accumulated query parameters.
180    pub fn query(&self) -> &[(String, String)] {
181        &self.query
182    }
183
184    /// Appends a single query pair preserving insertion order.
185    ///
186    /// # Parameters
187    /// - `key`: Parameter name.
188    /// - `value`: Parameter value.
189    ///
190    /// # Returns
191    /// `self` for method chaining.
192    pub fn add_query_param(&mut self, key: &str, value: &str) -> &mut Self {
193        self.query.push((key.to_string(), value.to_string()));
194        self
195    }
196
197    /// Removes every query pair from this snapshot.
198    ///
199    /// # Returns
200    /// `self` for method chaining.
201    pub fn clear_query_params(&mut self) -> &mut Self {
202        self.query.clear();
203        self
204    }
205
206    /// Returns request-local headers layered on top of client defaults and
207    /// injector output at send time.
208    ///
209    /// # Returns
210    /// Borrowed [`HeaderMap`] owned by this request only (not merged defaults).
211    pub fn headers(&self) -> &HeaderMap {
212        &self.headers
213    }
214
215    /// Parses and inserts one header from string name/value pairs.
216    ///
217    /// # Parameters
218    /// - `name`: Header field name.
219    /// - `value`: Header field value.
220    ///
221    /// # Returns
222    /// `Ok(self)` on success.
223    ///
224    /// # Errors
225    /// Returns [`HttpError`] when name or value cannot be converted into valid
226    /// HTTP tokens.
227    pub fn set_header(&mut self, name: &str, value: &str) -> Result<&mut Self, HttpError> {
228        let (header_name, header_value) = parse_header(name, value)?;
229        self.headers.insert(header_name, header_value);
230        self.invalidate_effective_headers_cache();
231        Ok(self)
232    }
233
234    /// Inserts one header using pre-validated [`HeaderName`] / [`HeaderValue`]
235    /// types.
236    ///
237    /// # Parameters
238    /// - `name`: Typed header name.
239    /// - `value`: Typed header value.
240    ///
241    /// # Returns
242    /// `self` for method chaining.
243    pub fn set_typed_header(&mut self, name: HeaderName, value: HeaderValue) -> &mut Self {
244        self.headers.insert(name, value);
245        self.invalidate_effective_headers_cache();
246        self
247    }
248
249    /// Removes all values for a header field by typed name.
250    ///
251    /// # Parameters
252    /// - `name`: Header name to strip from the request-local map.
253    ///
254    /// # Returns
255    /// `self` for method chaining.
256    pub fn remove_header(&mut self, name: &HeaderName) -> &mut Self {
257        self.headers.remove(name);
258        self.invalidate_effective_headers_cache();
259        self
260    }
261
262    /// Clears all request-local headers (defaults and injectors are unaffected
263    /// until send).
264    ///
265    /// # Returns
266    /// `self` for method chaining.
267    pub fn clear_headers(&mut self) -> &mut Self {
268        self.headers.clear();
269        self.invalidate_effective_headers_cache();
270        self
271    }
272
273    /// Returns the serialized body variant for this snapshot.
274    ///
275    /// # Returns
276    /// Borrowed [`HttpRequestBody`].
277    pub fn body(&self) -> &HttpRequestBody {
278        &self.body
279    }
280
281    /// Replaces the entire body payload.
282    ///
283    /// # Parameters
284    /// - `body`: New [`HttpRequestBody`] variant.
285    ///
286    /// # Returns
287    /// `self` for method chaining.
288    pub fn set_body(&mut self, body: HttpRequestBody) -> &mut Self {
289        self.body = body;
290        self.streaming_body = None;
291        self
292    }
293
294    /// Sets deferred streaming upload body factory for this request.
295    ///
296    /// # Parameters
297    /// - `streaming_body`: Deferred body stream factory reused across retries.
298    ///
299    /// # Returns
300    /// `self` for method chaining.
301    pub fn set_streaming_body(&mut self, streaming_body: HttpRequestStreamingBody) -> &mut Self {
302        self.streaming_body = Some(streaming_body);
303        self.body = HttpRequestBody::Empty;
304        self
305    }
306
307    /// Returns the per-request total timeout, if any.
308    ///
309    /// # Returns
310    /// `Some(duration)` when a request-specific timeout overrides the client
311    /// default; otherwise `None`.
312    pub fn request_timeout(&self) -> Option<Duration> {
313        self.execution_options.request_timeout
314    }
315
316    /// Sets a per-request total timeout that overrides the client default for
317    /// this send.
318    ///
319    /// # Parameters
320    /// - `timeout`: Upper bound for the entire request lifecycle handled by
321    ///   reqwest.
322    ///
323    /// # Returns
324    /// `self` for method chaining.
325    pub fn set_request_timeout(&mut self, timeout: Duration) -> &mut Self {
326        self.execution_options.request_timeout = Some(timeout);
327        self
328    }
329
330    /// Drops the per-request timeout so the client-wide default applies again.
331    ///
332    /// # Returns
333    /// `self` for method chaining.
334    pub fn clear_request_timeout(&mut self) -> &mut Self {
335        self.execution_options.request_timeout = None;
336        self
337    }
338
339    /// Returns the write-phase timeout used while sending the request.
340    pub fn write_timeout(&self) -> Duration {
341        self.execution_options.write_timeout
342    }
343
344    /// Sets the write-phase timeout used while sending the request.
345    pub fn set_write_timeout(&mut self, timeout: Duration) -> &mut Self {
346        self.execution_options.write_timeout = timeout;
347        self
348    }
349
350    /// Returns the read-phase timeout used while reading response body bytes.
351    pub fn read_timeout(&self) -> Duration {
352        self.execution_options.read_timeout
353    }
354
355    /// Sets the read-phase timeout used while reading response body bytes.
356    pub fn set_read_timeout(&mut self, timeout: Duration) -> &mut Self {
357        self.execution_options.read_timeout = timeout;
358        self
359    }
360
361    /// Returns the optional base URL used to resolve relative [`Self::path`]
362    /// values.
363    ///
364    /// # Returns
365    /// `Some` when a base is configured; `None` when only absolute URLs in
366    /// `path` are valid.
367    pub fn base_url(&self) -> Option<&Url> {
368        self.context.base_url.as_ref()
369    }
370
371    /// Sets the base URL used for internal URL resolution when `path` is not
372    /// absolute.
373    ///
374    /// # Parameters
375    /// - `base_url`: Root URL to join against relative paths.
376    ///
377    /// # Returns
378    /// `self` for method chaining.
379    pub fn set_base_url(&mut self, base_url: Url) -> &mut Self {
380        self.context.base_url = Some(base_url);
381        self.refresh_resolved_url_cache();
382        self
383    }
384
385    /// Removes the configured base URL so relative paths can no longer be
386    /// resolved without resetting it.
387    ///
388    /// # Returns
389    /// `self` for method chaining.
390    pub fn clear_base_url(&mut self) -> &mut Self {
391        self.context.base_url = None;
392        self.refresh_resolved_url_cache();
393        self
394    }
395
396    /// Returns whether IPv6 literal hosts are rejected after URL resolution.
397    ///
398    /// # Returns
399    /// `true` when a resolved URL whose host is an IPv6 literal must be
400    /// rejected with [`HttpError::invalid_url`].
401    pub fn ipv4_only(&self) -> bool {
402        self.context.ipv4_only
403    }
404
405    /// Enables or disables IPv6 literal host rejection for resolved URLs.
406    ///
407    /// # Parameters
408    /// - `enabled`: When `true`, resolved URLs whose host is an IPv6 literal
409    ///   are errors.
410    ///
411    /// # Returns
412    /// `self` for method chaining.
413    pub fn set_ipv4_only(&mut self, enabled: bool) -> &mut Self {
414        self.context.ipv4_only = enabled;
415        self.refresh_resolved_url_cache();
416        self
417    }
418
419    /// Returns the cooperative cancellation handle, if configured.
420    ///
421    /// # Returns
422    /// `Some` token checked before send and during I/O; `None` when
423    /// cancellation is not wired.
424    pub fn cancellation_token(&self) -> Option<&CancellationToken> {
425        self.execution_options.cancellation_token.as_ref()
426    }
427
428    /// Attaches a [`CancellationToken`] that can abort this request
429    /// cooperatively.
430    ///
431    /// # Parameters
432    /// - `token`: Shared cancellation source.
433    ///
434    /// # Returns
435    /// `self` for method chaining.
436    pub fn set_cancellation_token(&mut self, token: CancellationToken) -> &mut Self {
437        self.execution_options.cancellation_token = Some(token);
438        self
439    }
440
441    /// Removes any cancellation token from this snapshot.
442    ///
443    /// # Returns
444    /// `self` for method chaining.
445    pub fn clear_cancellation_token(&mut self) -> &mut Self {
446        self.execution_options.cancellation_token = None;
447        self
448    }
449
450    /// Returns the per-request retry override applied by the client pipeline.
451    ///
452    /// # Returns
453    /// Borrowed [`HttpRequestRetryOverride`].
454    pub fn retry_override(&self) -> &HttpRequestRetryOverride {
455        &self.execution_options.retry_override
456    }
457
458    /// Replaces the retry override for this single request.
459    ///
460    /// # Parameters
461    /// - `retry_override`: New override policy and knobs.
462    ///
463    /// # Returns
464    /// `self` for method chaining.
465    pub fn set_retry_override(&mut self, retry_override: HttpRequestRetryOverride) -> &mut Self {
466        self.execution_options.retry_override = retry_override;
467        self
468    }
469
470    /// Moves the current body out, leaving [`HttpRequestBody::Empty`] in its
471    /// place.
472    ///
473    /// Used internally before handing the payload to reqwest so the snapshot is
474    /// not cloned twice.
475    ///
476    /// # Returns
477    /// Previous [`HttpRequestBody`] value.
478    pub(crate) fn take_body(&mut self) -> HttpRequestBody {
479        std::mem::replace(&mut self.body, HttpRequestBody::Empty)
480    }
481
482    /// Assembles a reqwest [`RequestBuilder`](reqwest::RequestBuilder), applies
483    /// this snapshot's body, then sends with a bounded write phase.
484    ///
485    /// Centralizes send-attempt preparation and transport wiring:
486    /// - invalidates and recomputes effective headers for each attempt;
487    /// - emits request TRACE logs via the provided logger;
488    /// - applies query/timeout/body wiring plus cooperative cancellation and
489    ///   write-timeout handling.
490    ///
491    /// # Parameters
492    /// - `backend`: Shared reqwest client.
493    /// - `logger`: Attempt-scoped request logger.
494    ///
495    /// # Returns
496    /// The successful [`Response`] or a mapped [`HttpError`].
497    ///
498    /// # Errors
499    /// - Cooperative cancellation while waiting on the send future.
500    /// - Transport failures mapped from reqwest.
501    /// - Write timeout when the send future does not complete within
502    ///   `write_timeout`.
503    pub(crate) async fn send_impl(
504        &mut self,
505        backend: &reqwest::Client,
506        logger: &HttpLogger<'_>,
507    ) -> HttpResult<Response> {
508        // Effective headers are cached on the request. Each send attempt must
509        // invalidate and recompute them so injector output and request mutations
510        // are refreshed instead of reusing stale headers from prior attempts.
511        self.invalidate_effective_headers_cache();
512        let method = self.method().clone();
513        let request_url_context = self.resolved_url_with_query().ok();
514        let write_timeout = self.execution_options.write_timeout;
515        let cancellation_token = self.execution_options.cancellation_token.clone();
516        let headers = Self::await_pre_send_future(
517            self.effective_headers(),
518            write_timeout,
519            cancellation_token,
520            &method,
521            request_url_context.as_ref(),
522            "Request cancelled while preparing request",
523            format!(
524                "Write timeout after {:?} while preparing request",
525                write_timeout
526            ),
527        )
528        .await?
529        .clone();
530        let url = self.resolved_url()?;
531        let request_url = self.resolved_url_with_query()?;
532        // Log the request after computing effective headers so TRACE logs
533        // include the same query string used by the actual send path.
534        logger.log_request(self);
535        let mut builder = backend.request(method.clone(), url.clone());
536        builder = builder.headers(headers);
537        if !self.query.is_empty() {
538            builder = builder.query(self.query.as_slice());
539        }
540        if let Some(timeout) = self.execution_options.request_timeout {
541            builder = builder.timeout(timeout);
542        }
543        if let Some(streaming_body) = self.streaming_body.as_ref() {
544            let body = Self::await_pre_send_future(
545                async { Ok(streaming_body.to_reqwest_body().await) },
546                self.execution_options.write_timeout,
547                self.execution_options.cancellation_token.clone(),
548                &method,
549                Some(&request_url),
550                "Request cancelled while preparing streaming request body",
551                format!(
552                    "Write timeout after {:?} while preparing streaming request body",
553                    self.execution_options.write_timeout
554                ),
555            )
556            .await?;
557            builder = builder.body(body);
558        } else {
559            builder = Self::apply_request_body(builder, self.take_body());
560        }
561
562        let send_future =
563            tokio::time::timeout(self.execution_options.write_timeout, builder.send());
564        let next = if let Some(token) = self.execution_options.cancellation_token.as_ref() {
565            tokio::select! {
566                _ = token.cancelled() => {
567                    return Err(HttpError::cancelled("Request cancelled while sending")
568                        .with_method(&method)
569                        .with_url(&request_url));
570                }
571                send_result = send_future => send_result,
572            }
573        } else {
574            send_future.await
575        };
576
577        match next {
578            Ok(Ok(response)) => Ok(response),
579            Ok(Err(error)) => Err(map_reqwest_error(
580                error,
581                HttpErrorKind::Transport,
582                Some(ReqwestErrorPhase::Send),
583                Some(method.clone()),
584                Some(request_url.clone()),
585            )),
586            Err(_) => Err(HttpError::write_timeout(format!(
587                "Write timeout after {:?} while sending request",
588                self.execution_options.write_timeout
589            ))
590            .with_method(&method)
591            .with_url(&request_url)),
592        }
593    }
594
595    /// Waits for one asynchronous pre-send preparation step with cancellation and
596    /// write-timeout handling.
597    ///
598    /// # Parameters
599    /// - `future`: Preparation future, such as async header injection or streaming
600    ///   body factory execution.
601    /// - `write_timeout`: Timeout budget reused for send preparation.
602    /// - `cancellation_token`: Optional request cancellation token.
603    /// - `method`: Request method for error context.
604    /// - `request_url`: Optional resolved request URL for error context.
605    /// - `cancellation_message`: Message used when cancellation wins.
606    /// - `timeout_message`: Message used when timeout wins.
607    ///
608    /// # Returns
609    /// The future output when it completes before cancellation or timeout.
610    ///
611    /// # Errors
612    /// Returns [`HttpErrorKind::Cancelled`] on cancellation,
613    /// [`HttpErrorKind::WriteTimeout`] on timeout, or propagates the future's own
614    /// error.
615    async fn await_pre_send_future<T, F>(
616        future: F,
617        write_timeout: Duration,
618        cancellation_token: Option<CancellationToken>,
619        method: &Method,
620        request_url: Option<&Url>,
621        cancellation_message: &str,
622        timeout_message: String,
623    ) -> HttpResult<T>
624    where
625        F: Future<Output = HttpResult<T>>,
626    {
627        let timed = tokio::time::timeout(write_timeout, future);
628        let next = if let Some(token) = cancellation_token.as_ref() {
629            tokio::select! {
630                _ = token.cancelled() => {
631                    return Err(Self::pre_send_cancelled_error(
632                        cancellation_message,
633                        method,
634                        request_url,
635                    ));
636                }
637                result = timed => result,
638            }
639        } else {
640            timed.await
641        };
642
643        match next {
644            Ok(result) => result,
645            Err(_) => Err(Self::pre_send_write_timeout_error(
646                timeout_message,
647                method,
648                request_url,
649            )),
650        }
651    }
652
653    /// Builds a cancellation error for pre-send preparation.
654    ///
655    /// # Parameters
656    /// - `message`: Cancellation message.
657    /// - `method`: Request method for context.
658    /// - `request_url`: Optional request URL for context.
659    ///
660    /// # Returns
661    /// Cancellation [`HttpError`] with request context attached.
662    fn pre_send_cancelled_error(
663        message: &str,
664        method: &Method,
665        request_url: Option<&Url>,
666    ) -> HttpError {
667        let mut error = HttpError::cancelled(message).with_method(method);
668        if let Some(request_url) = request_url {
669            error = error.with_url(request_url);
670        }
671        error
672    }
673
674    /// Builds a write-timeout error for pre-send preparation.
675    ///
676    /// # Parameters
677    /// - `message`: Timeout message.
678    /// - `method`: Request method for context.
679    /// - `request_url`: Optional request URL for context.
680    ///
681    /// # Returns
682    /// Write-timeout [`HttpError`] with request context attached.
683    fn pre_send_write_timeout_error(
684        message: String,
685        method: &Method,
686        request_url: Option<&Url>,
687    ) -> HttpError {
688        let mut error = HttpError::write_timeout(message).with_method(method);
689        if let Some(request_url) = request_url {
690            error = error.with_url(request_url);
691        }
692        error
693    }
694
695    /// Returns the resolved URL for current request fields, computing and
696    /// caching it on demand.
697    ///
698    /// # Returns
699    /// Resolved [`Url`] value (cloned from cache when already computed).
700    ///
701    /// # Errors
702    /// Returns [`HttpError::invalid_url`] when parsing fails, the base URL is
703    /// missing for a relative path, joining fails, or [`Self::ipv4_only`]
704    /// rejects an IPv6 literal host.
705    pub(crate) fn resolved_url(&self) -> Result<Url, HttpError> {
706        let cached = match self.resolved_url.read() {
707            Ok(guard) => guard.clone(),
708            Err(_) => return Err(HttpError::other("Resolved URL cache read lock poisoned")),
709        };
710        if let Some(url) = cached.as_ref() {
711            return Ok(url.clone());
712        }
713        let resolved = self.compute_resolved_url()?;
714        match self.resolved_url.write() {
715            Ok(mut guard) => *guard = Some(resolved.clone()),
716            Err(_) => return Err(HttpError::other("Resolved URL cache write lock poisoned")),
717        }
718        Ok(resolved)
719    }
720
721    /// Returns the resolved URL plus request-builder query parameters.
722    ///
723    /// This mirrors the URL sent by [`Self::send_impl`]: query pairs already
724    /// present in [`Self::path`] are preserved, and pairs from
725    /// [`Self::query`] are appended in insertion order.
726    ///
727    /// # Returns
728    /// Resolved [`Url`] including query parameters from this request snapshot.
729    ///
730    /// # Errors
731    /// Propagates [`Self::resolved_url`] errors for invalid or unresolved URLs.
732    pub(crate) fn resolved_url_with_query(&self) -> Result<Url, HttpError> {
733        let mut url = self.resolved_url()?;
734        if !self.query.is_empty() {
735            {
736                let mut pairs = url.query_pairs_mut();
737                for (key, value) in &self.query {
738                    pairs.append_pair(key, value);
739                }
740            }
741        }
742        Ok(url)
743    }
744
745    /// Returns cached resolved URL when available.
746    pub fn resolved_url_cached(&self) -> Option<Url> {
747        self.resolved_url
748            .read()
749            .map(|guard| guard.clone())
750            .unwrap_or_default()
751    }
752
753    /// Recomputes and stores the current resolved URL.
754    fn refresh_resolved_url_cache(&mut self) {
755        if let Ok(mut guard) = self.resolved_url.write() {
756            *guard = self.compute_resolved_url().ok();
757        }
758    }
759
760    /// Computes the resolved URL from current path/base/ipv4 settings.
761    fn compute_resolved_url(&self) -> Result<Url, HttpError> {
762        if let Ok(url) = Url::parse(&self.path) {
763            self.validate_resolved_url_host(&url)?;
764            return Ok(url);
765        }
766
767        let base = self.context.base_url.as_ref().ok_or_else(|| {
768            HttpError::invalid_url(format!(
769                "Cannot resolve relative path '{}' without base_url",
770                self.path
771            ))
772        })?;
773
774        let url = base.join(&self.path).map_err(|error| {
775            HttpError::invalid_url(format!(
776                "Failed to resolve path '{}' against base URL '{}': {}",
777                self.path, base, error
778            ))
779        })?;
780        self.validate_resolved_url_host(&url)?;
781        Ok(url)
782    }
783
784    /// Enforces [`Self::ipv4_only`] by rejecting IPv6 literal hosts in `url`.
785    ///
786    /// # Parameters
787    /// - `url`: Candidate URL after parsing or joining.
788    ///
789    /// # Returns
790    /// `Ok(())` when the host is acceptable.
791    ///
792    /// # Errors
793    /// [`HttpError::invalid_url`] when `ipv4_only` is `true` and the host is an
794    /// IPv6 literal.
795    fn validate_resolved_url_host(&self, url: &Url) -> Result<(), HttpError> {
796        if self.context.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
797            return Err(HttpError::invalid_url(format!(
798                "IPv6 literal host is not allowed when ipv4_only=true: {}",
799                url
800            )));
801        }
802        Ok(())
803    }
804
805    /// Returns the attempt-scoped merged outbound headers.
806    ///
807    /// On first call after invalidation, this computes merged headers by
808    /// replaying defaults/injectors/request-local headers and stores them in
809    /// [`Self::effective_headers`]. Later calls in the same attempt return the
810    /// cached map.
811    ///
812    /// Why this API is async:
813    /// - async injectors are part of header assembly and may perform awaitable
814    ///   work (for example token refresh or other I/O-backed value resolution).
815    /// - therefore header materialization cannot be fully synchronous.
816    ///
817    /// Merge order (later wins on duplicates):
818    /// 1. Client default headers snapshot captured when the builder was
819    ///    created.
820    /// 2. Synchronous injector output in registration order.
821    /// 3. Asynchronous injector output in registration order.
822    /// 4. Request-local headers from this snapshot.
823    ///
824    /// # Returns
825    /// Borrowed merged [`HeaderMap`] from the cache.
826    ///
827    /// # Errors
828    /// Propagates failures returned by any injector's `apply` implementation.
829    pub(crate) async fn effective_headers(&mut self) -> HttpResult<&HeaderMap> {
830        if self.effective_headers.is_none() {
831            self.effective_headers = Some(self.compute_effective_headers().await?);
832        }
833        Ok(self
834            .effective_headers
835            .as_ref()
836            .expect("effective headers cache must be populated after computation"))
837    }
838
839    /// Returns cached merged outbound headers when available.
840    pub(crate) fn effective_headers_cached(&self) -> Option<&HeaderMap> {
841        self.effective_headers.as_ref()
842    }
843
844    /// Clears the effective-header cache.
845    ///
846    /// This method invalidates [`Self::effective_headers`] so the next call to
847    /// [`Self::effective_headers`] recomputes merged headers by re-running
848    /// defaults and header injectors.
849    ///
850    /// Why this is needed:
851    /// - request-local headers may have been mutated (`set_header`, `clear_headers`, etc.);
852    /// - injector output may be time-sensitive (for example rotating auth token
853    ///   or timestamp-based signatures), so each send attempt should recompute
854    ///   merged headers instead of reusing stale values from prior attempts.
855    ///
856    /// When to call:
857    /// - immediately before starting a new send attempt;
858    /// - after any mutation that can change final outbound headers.
859    pub(crate) fn invalidate_effective_headers_cache(&mut self) {
860        self.effective_headers = None;
861    }
862
863    /// Computes merged outbound headers without touching the cache.
864    async fn compute_effective_headers(&self) -> HttpResult<HeaderMap> {
865        let mut headers = self.context.default_headers.clone();
866
867        for injector in &self.context.injectors {
868            injector.apply(&mut headers)?;
869        }
870        for injector in &self.context.async_injectors {
871            injector.apply(&mut headers).await?;
872        }
873
874        headers.extend(self.headers.clone());
875        Ok(headers)
876    }
877
878    /// Returns a pre-cancelled [`HttpError`] when a token is present and
879    /// already cancelled.
880    ///
881    /// # Parameters
882    /// - `message`: Human-readable cancellation reason.
883    ///
884    /// # Returns
885    /// `Some` [`HttpError`] (including method context and cached URL when
886    /// available) when a token exists and is already cancelled; otherwise
887    /// `None`.
888    pub(crate) fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
889        if self
890            .execution_options
891            .cancellation_token
892            .as_ref()
893            .is_some_and(CancellationToken::is_cancelled)
894        {
895            let mut error = HttpError::cancelled(message.to_string()).with_method(&self.method);
896            if let Ok(url) = self.resolved_url_with_query() {
897                error = error.with_url(&url);
898            }
899            Some(error)
900        } else {
901            None
902        }
903    }
904
905    /// Attaches the correct reqwest body encoding for each [`HttpRequestBody`]
906    /// variant.
907    ///
908    /// # Parameters
909    /// - `builder`: Partially configured [`reqwest::RequestBuilder`]
910    ///   (method/URL/headers already set).
911    /// - `body`: Payload variant to attach; moved into the builder.
912    ///
913    /// # Returns
914    /// The same builder with an appropriate `.body(...)` applied (or unchanged
915    /// for [`HttpRequestBody::Empty`]).
916    fn apply_request_body(
917        builder: reqwest::RequestBuilder,
918        body: HttpRequestBody,
919    ) -> reqwest::RequestBuilder {
920        match body {
921            HttpRequestBody::Empty => builder,
922            HttpRequestBody::Bytes(bytes)
923            | HttpRequestBody::Json(bytes)
924            | HttpRequestBody::Form(bytes)
925            | HttpRequestBody::Multipart(bytes)
926            | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
927            HttpRequestBody::Stream(chunks) => {
928                let body_stream = futures_stream::iter(
929                    chunks.into_iter().map(Result::<Bytes, std::io::Error>::Ok),
930                );
931                builder.body(reqwest::Body::wrap_stream(body_stream))
932            }
933            HttpRequestBody::Text(text) => builder.body(text),
934        }
935    }
936}
937
938/// Exercises request cache refresh branches for coverage-only tests.
939///
940/// # Returns
941/// Resolved URL and effective header count diagnostics.
942#[cfg(coverage)]
943#[doc(hidden)]
944pub(crate) async fn coverage_exercise_request_cache_paths() -> Vec<String> {
945    let client = crate::HttpClientFactory::new()
946        .create_default()
947        .expect("coverage HTTP client should build");
948    let mut request = client
949        .request(Method::GET, "https://example.com/coverage-cache")
950        .build();
951    *request
952        .resolved_url
953        .write()
954        .expect("resolved URL cache lock should not be poisoned") = None;
955    let url = request
956        .resolved_url()
957        .expect("coverage request URL should resolve");
958    let header_count = request
959        .effective_headers()
960        .await
961        .expect("coverage headers should compute")
962        .len();
963    let cached_header_count = request
964        .effective_headers()
965        .await
966        .expect("coverage headers should be cached")
967        .len();
968    vec![
969        url.to_string(),
970        header_count.to_string(),
971        cached_header_count.to_string(),
972    ]
973}
974
975impl Clone for HttpRequest {
976    fn clone(&self) -> Self {
977        Self {
978            method: self.method.clone(),
979            path: self.path.clone(),
980            query: self.query.clone(),
981            headers: self.headers.clone(),
982            body: self.body.clone(),
983            streaming_body: self.streaming_body.clone(),
984            resolved_url: RwLock::new(self.resolved_url_cached()),
985            effective_headers: self.effective_headers.clone(),
986            execution_options: self.execution_options.clone(),
987            context: self.context.clone(),
988        }
989    }
990}