Skip to main content

qubit_http/request/
http_request.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! Immutable HTTP request object.
10
11use std::future::Future;
12use std::sync::RwLock;
13use std::time::Duration;
14
15use bytes::Bytes;
16use futures_util::stream as futures_stream;
17use http::{HeaderMap, HeaderName, HeaderValue, Method};
18use qubit_function::MutatingFunction;
19use reqwest::Response;
20use tokio_util::sync::CancellationToken;
21use url::Host;
22use url::Url;
23
24use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
25use crate::{
26    AsyncHttpHeaderInjector, HttpError, HttpErrorKind, HttpHeaderInjector, HttpLogger,
27    HttpRequestStreamingBody, HttpResult,
28};
29
30use super::http_request_body::HttpRequestBody;
31use super::http_request_builder::HttpRequestBuilder;
32use super::http_request_retry_override::HttpRequestRetryOverride;
33use super::parse_header;
34
35/// Request execution options (timeouts, cancellation, and retry override).
36#[derive(Debug, Clone)]
37struct HttpRequestExecutionOptions {
38    /// Overrides client-wide request timeout when set; otherwise client default applies.
39    request_timeout: Option<Duration>,
40    /// Per-request write timeout used during request sending.
41    write_timeout: Duration,
42    /// Per-request read timeout used during response body reads.
43    read_timeout: Duration,
44    /// Optional cancellation token checked before send and during I/O phases.
45    cancellation_token: Option<CancellationToken>,
46    /// Per-request retry override (enable/disable/method-policy/Retry-After behavior).
47    retry_override: HttpRequestRetryOverride,
48}
49
50/// Request context captured from the originating client.
51#[derive(Debug, Clone)]
52struct HttpRequestContext {
53    /// Base URL copied from client options, used to resolve relative `path`.
54    base_url: Option<Url>,
55    /// Whether resolved URLs must avoid IPv6 literal hosts.
56    ipv4_only: bool,
57    /// Client default headers snapshot captured when this request builder was created.
58    default_headers: HeaderMap,
59    /// Client sync header injectors snapshot captured when this request builder was created.
60    injectors: Vec<HttpHeaderInjector>,
61    /// Client async header injectors snapshot captured when this request builder was created.
62    async_injectors: Vec<AsyncHttpHeaderInjector>,
63}
64
65/// Immutable snapshot of a single HTTP call produced by
66/// [`crate::HttpRequestBuilder`].
67#[derive(Debug)]
68pub struct HttpRequest {
69    /// HTTP method (GET, POST, …).
70    method: Method,
71    /// Absolute URL string, or path joined with client `base_url` when not
72    /// parseable as URL.
73    path: String,
74    /// Query string parameters as `(name, value)` pairs.
75    query: Vec<(String, String)>,
76    /// Headers added on top of client defaults and injector output.
77    headers: HeaderMap,
78    /// Serialized body variant.
79    body: HttpRequestBody,
80    /// Deferred per-attempt streaming body factory.
81    streaming_body: Option<HttpRequestStreamingBody>,
82    /// Lazily maintained cache for the currently resolved URL.
83    resolved_url: RwLock<Option<Url>>,
84    /// Attempt-scoped cache of merged outbound headers after applying
85    /// defaults/injectors/request-local headers.
86    effective_headers: Option<HeaderMap>,
87    /// Request execution options and runtime controls.
88    execution_options: HttpRequestExecutionOptions,
89    /// Client-derived context for URL and header resolution.
90    context: HttpRequestContext,
91}
92
93impl HttpRequest {
94    /// Consumes a finished [`HttpRequestBuilder`] and freezes its fields into
95    /// an [`HttpRequest`].
96    ///
97    /// # Parameters
98    /// - `builder`: Populated builder produced by the HTTP client pipeline.
99    ///
100    /// # Returns
101    /// Snapshot ready for URL resolution, header assembly, and sending.
102    pub(super) fn new(builder: HttpRequestBuilder) -> Self {
103        let mut request = Self {
104            method: builder.method,
105            path: builder.path,
106            query: builder.query,
107            headers: builder.headers,
108            body: builder.body,
109            streaming_body: builder.streaming_body,
110            resolved_url: RwLock::new(None),
111            effective_headers: None,
112            execution_options: HttpRequestExecutionOptions {
113                request_timeout: builder.request_timeout,
114                write_timeout: builder.write_timeout,
115                read_timeout: builder.read_timeout,
116                cancellation_token: builder.cancellation_token,
117                retry_override: builder.retry_override,
118            },
119            context: HttpRequestContext {
120                base_url: builder.base_url,
121                ipv4_only: builder.ipv4_only,
122                default_headers: builder.default_headers,
123                injectors: builder.injectors,
124                async_injectors: builder.async_injectors,
125            },
126        };
127        request.refresh_resolved_url_cache();
128        request
129    }
130
131    /// Returns the HTTP verb for this snapshot.
132    ///
133    /// # Returns
134    /// Borrowed [`Method`] (for example GET or POST).
135    pub fn method(&self) -> &Method {
136        &self.method
137    }
138
139    /// Replaces the HTTP verb.
140    ///
141    /// # Parameters
142    /// - `method`: New [`Method`].
143    ///
144    /// # Returns
145    /// `self` for method chaining.
146    pub fn set_method(&mut self, method: Method) -> &mut Self {
147        self.method = method;
148        self
149    }
150
151    /// Returns the path segment or absolute URL string stored on this request.
152    ///
153    /// # Returns
154    /// The raw path/URL before query string assembly; may be relative if a base
155    /// URL is set.
156    pub fn path(&self) -> &str {
157        &self.path
158    }
159
160    /// Replaces the path or absolute URL string.
161    ///
162    /// # Parameters
163    /// - `path`: New path or URL string (query string is managed separately via
164    ///   [`Self::add_query_param`]).
165    ///
166    /// # Returns
167    /// `self` for method chaining.
168    pub fn set_path(&mut self, path: &str) -> &mut Self {
169        self.path = path.to_string();
170        self.refresh_resolved_url_cache();
171        self
172    }
173
174    /// Returns ordered `(name, value)` query pairs that will be appended to the
175    /// resolved URL.
176    ///
177    /// # Returns
178    /// Slice view of accumulated query parameters.
179    pub fn query(&self) -> &[(String, String)] {
180        &self.query
181    }
182
183    /// Appends a single query pair preserving insertion order.
184    ///
185    /// # Parameters
186    /// - `key`: Parameter name.
187    /// - `value`: Parameter value.
188    ///
189    /// # Returns
190    /// `self` for method chaining.
191    pub fn add_query_param(&mut self, key: &str, value: &str) -> &mut Self {
192        self.query.push((key.to_string(), value.to_string()));
193        self
194    }
195
196    /// Removes every query pair from this snapshot.
197    ///
198    /// # Returns
199    /// `self` for method chaining.
200    pub fn clear_query_params(&mut self) -> &mut Self {
201        self.query.clear();
202        self
203    }
204
205    /// Returns request-local headers layered on top of client defaults and
206    /// injector output at send time.
207    ///
208    /// # Returns
209    /// Borrowed [`HeaderMap`] owned by this request only (not merged defaults).
210    pub fn headers(&self) -> &HeaderMap {
211        &self.headers
212    }
213
214    /// Parses and inserts one header from string name/value pairs.
215    ///
216    /// # Parameters
217    /// - `name`: Header field name.
218    /// - `value`: Header field value.
219    ///
220    /// # Returns
221    /// `Ok(self)` on success.
222    ///
223    /// # Errors
224    /// Returns [`HttpError`] when name or value cannot be converted into valid
225    /// HTTP tokens.
226    pub fn set_header(&mut self, name: &str, value: &str) -> Result<&mut Self, HttpError> {
227        let (header_name, header_value) = parse_header(name, value)?;
228        self.headers.insert(header_name, header_value);
229        self.invalidate_effective_headers_cache();
230        Ok(self)
231    }
232
233    /// Inserts one header using pre-validated [`HeaderName`] / [`HeaderValue`]
234    /// types.
235    ///
236    /// # Parameters
237    /// - `name`: Typed header name.
238    /// - `value`: Typed header value.
239    ///
240    /// # Returns
241    /// `self` for method chaining.
242    pub fn set_typed_header(&mut self, name: HeaderName, value: HeaderValue) -> &mut Self {
243        self.headers.insert(name, value);
244        self.invalidate_effective_headers_cache();
245        self
246    }
247
248    /// Removes all values for a header field by typed name.
249    ///
250    /// # Parameters
251    /// - `name`: Header name to strip from the request-local map.
252    ///
253    /// # Returns
254    /// `self` for method chaining.
255    pub fn remove_header(&mut self, name: &HeaderName) -> &mut Self {
256        self.headers.remove(name);
257        self.invalidate_effective_headers_cache();
258        self
259    }
260
261    /// Clears all request-local headers (defaults and injectors are unaffected
262    /// until send).
263    ///
264    /// # Returns
265    /// `self` for method chaining.
266    pub fn clear_headers(&mut self) -> &mut Self {
267        self.headers.clear();
268        self.invalidate_effective_headers_cache();
269        self
270    }
271
272    /// Returns the serialized body variant for this snapshot.
273    ///
274    /// # Returns
275    /// Borrowed [`HttpRequestBody`].
276    pub fn body(&self) -> &HttpRequestBody {
277        &self.body
278    }
279
280    /// Replaces the entire body payload.
281    ///
282    /// # Parameters
283    /// - `body`: New [`HttpRequestBody`] variant.
284    ///
285    /// # Returns
286    /// `self` for method chaining.
287    pub fn set_body(&mut self, body: HttpRequestBody) -> &mut Self {
288        self.body = body;
289        self.streaming_body = None;
290        self
291    }
292
293    /// Sets deferred streaming upload body factory for this request.
294    ///
295    /// # Parameters
296    /// - `streaming_body`: Deferred body stream factory reused across retries.
297    ///
298    /// # Returns
299    /// `self` for method chaining.
300    pub fn set_streaming_body(&mut self, streaming_body: HttpRequestStreamingBody) -> &mut Self {
301        self.streaming_body = Some(streaming_body);
302        self.body = HttpRequestBody::Empty;
303        self
304    }
305
306    /// Returns the per-request total timeout, if any.
307    ///
308    /// # Returns
309    /// `Some(duration)` when a request-specific timeout overrides the client
310    /// default; otherwise `None`.
311    pub fn request_timeout(&self) -> Option<Duration> {
312        self.execution_options.request_timeout
313    }
314
315    /// Sets a per-request total timeout that overrides the client default for
316    /// this send.
317    ///
318    /// # Parameters
319    /// - `timeout`: Upper bound for the entire request lifecycle handled by
320    ///   reqwest.
321    ///
322    /// # Returns
323    /// `self` for method chaining.
324    pub fn set_request_timeout(&mut self, timeout: Duration) -> &mut Self {
325        self.execution_options.request_timeout = Some(timeout);
326        self
327    }
328
329    /// Drops the per-request timeout so the client-wide default applies again.
330    ///
331    /// # Returns
332    /// `self` for method chaining.
333    pub fn clear_request_timeout(&mut self) -> &mut Self {
334        self.execution_options.request_timeout = None;
335        self
336    }
337
338    /// Returns the write-phase timeout used while sending the request.
339    pub fn write_timeout(&self) -> Duration {
340        self.execution_options.write_timeout
341    }
342
343    /// Sets the write-phase timeout used while sending the request.
344    pub fn set_write_timeout(&mut self, timeout: Duration) -> &mut Self {
345        self.execution_options.write_timeout = timeout;
346        self
347    }
348
349    /// Returns the read-phase timeout used while reading response body bytes.
350    pub fn read_timeout(&self) -> Duration {
351        self.execution_options.read_timeout
352    }
353
354    /// Sets the read-phase timeout used while reading response body bytes.
355    pub fn set_read_timeout(&mut self, timeout: Duration) -> &mut Self {
356        self.execution_options.read_timeout = timeout;
357        self
358    }
359
360    /// Returns the optional base URL used to resolve relative [`Self::path`]
361    /// values.
362    ///
363    /// # Returns
364    /// `Some` when a base is configured; `None` when only absolute URLs in
365    /// `path` are valid.
366    pub fn base_url(&self) -> Option<&Url> {
367        self.context.base_url.as_ref()
368    }
369
370    /// Sets the base URL used for internal URL resolution when `path` is not
371    /// absolute.
372    ///
373    /// # Parameters
374    /// - `base_url`: Root URL to join against relative paths.
375    ///
376    /// # Returns
377    /// `self` for method chaining.
378    pub fn set_base_url(&mut self, base_url: Url) -> &mut Self {
379        self.context.base_url = Some(base_url);
380        self.refresh_resolved_url_cache();
381        self
382    }
383
384    /// Removes the configured base URL so relative paths can no longer be
385    /// resolved without resetting it.
386    ///
387    /// # Returns
388    /// `self` for method chaining.
389    pub fn clear_base_url(&mut self) -> &mut Self {
390        self.context.base_url = None;
391        self.refresh_resolved_url_cache();
392        self
393    }
394
395    /// Returns whether IPv6 literal hosts are rejected after URL resolution.
396    ///
397    /// # Returns
398    /// `true` when a resolved URL whose host is an IPv6 literal must be
399    /// rejected with [`HttpError::invalid_url`].
400    pub fn ipv4_only(&self) -> bool {
401        self.context.ipv4_only
402    }
403
404    /// Enables or disables IPv6 literal host rejection for resolved URLs.
405    ///
406    /// # Parameters
407    /// - `enabled`: When `true`, resolved URLs whose host is an IPv6 literal
408    ///   are errors.
409    ///
410    /// # Returns
411    /// `self` for method chaining.
412    pub fn set_ipv4_only(&mut self, enabled: bool) -> &mut Self {
413        self.context.ipv4_only = enabled;
414        self.refresh_resolved_url_cache();
415        self
416    }
417
418    /// Returns the cooperative cancellation handle, if configured.
419    ///
420    /// # Returns
421    /// `Some` token checked before send and during I/O; `None` when
422    /// cancellation is not wired.
423    pub fn cancellation_token(&self) -> Option<&CancellationToken> {
424        self.execution_options.cancellation_token.as_ref()
425    }
426
427    /// Attaches a [`CancellationToken`] that can abort this request
428    /// cooperatively.
429    ///
430    /// # Parameters
431    /// - `token`: Shared cancellation source.
432    ///
433    /// # Returns
434    /// `self` for method chaining.
435    pub fn set_cancellation_token(&mut self, token: CancellationToken) -> &mut Self {
436        self.execution_options.cancellation_token = Some(token);
437        self
438    }
439
440    /// Removes any cancellation token from this snapshot.
441    ///
442    /// # Returns
443    /// `self` for method chaining.
444    pub fn clear_cancellation_token(&mut self) -> &mut Self {
445        self.execution_options.cancellation_token = None;
446        self
447    }
448
449    /// Returns the per-request retry override applied by the client pipeline.
450    ///
451    /// # Returns
452    /// Borrowed [`HttpRequestRetryOverride`].
453    pub fn retry_override(&self) -> &HttpRequestRetryOverride {
454        &self.execution_options.retry_override
455    }
456
457    /// Replaces the retry override for this single request.
458    ///
459    /// # Parameters
460    /// - `retry_override`: New override policy and knobs.
461    ///
462    /// # Returns
463    /// `self` for method chaining.
464    pub fn set_retry_override(&mut self, retry_override: HttpRequestRetryOverride) -> &mut Self {
465        self.execution_options.retry_override = retry_override;
466        self
467    }
468
469    /// Moves the current body out, leaving [`HttpRequestBody::Empty`] in its
470    /// place.
471    ///
472    /// Used internally before handing the payload to reqwest so the snapshot is
473    /// not cloned twice.
474    ///
475    /// # Returns
476    /// Previous [`HttpRequestBody`] value.
477    pub(crate) fn take_body(&mut self) -> HttpRequestBody {
478        std::mem::replace(&mut self.body, HttpRequestBody::Empty)
479    }
480
481    /// Assembles a reqwest [`RequestBuilder`](reqwest::RequestBuilder), applies
482    /// this snapshot's body, then sends with a bounded write phase.
483    ///
484    /// Centralizes send-attempt preparation and transport wiring:
485    /// - invalidates and recomputes effective headers for each attempt;
486    /// - emits request TRACE logs via the provided logger;
487    /// - applies query/timeout/body wiring plus cooperative cancellation and
488    ///   write-timeout handling.
489    ///
490    /// # Parameters
491    /// - `backend`: Shared reqwest client.
492    /// - `logger`: Attempt-scoped request logger.
493    ///
494    /// # Returns
495    /// The successful [`Response`] or a mapped [`HttpError`].
496    ///
497    /// # Errors
498    /// - Cooperative cancellation while waiting on the send future.
499    /// - Transport failures mapped from reqwest.
500    /// - Write timeout when the send future does not complete within
501    ///   `write_timeout`.
502    pub(crate) async fn send_impl(
503        &mut self,
504        backend: &reqwest::Client,
505        logger: &HttpLogger<'_>,
506    ) -> HttpResult<Response> {
507        // Effective headers are cached on the request. Each send attempt must
508        // invalidate and recompute them so injector output and request mutations
509        // are refreshed instead of reusing stale headers from prior attempts.
510        self.invalidate_effective_headers_cache();
511        let method = self.method().clone();
512        let request_url_context = self.resolved_url_with_query().ok();
513        let write_timeout = self.execution_options.write_timeout;
514        let cancellation_token = self.execution_options.cancellation_token.clone();
515        let headers = Self::await_pre_send_future(
516            self.effective_headers(),
517            write_timeout,
518            cancellation_token,
519            &method,
520            request_url_context.as_ref(),
521            "Request cancelled while preparing request",
522            format!(
523                "Write timeout after {:?} while preparing request",
524                write_timeout
525            ),
526        )
527        .await?
528        .clone();
529        let url = self.resolved_url()?;
530        let request_url = self.resolved_url_with_query()?;
531        // Log the request after computing effective headers so TRACE logs
532        // include the same query string used by the actual send path.
533        logger.log_request(self);
534        let mut builder = backend.request(method.clone(), url.clone());
535        builder = builder.headers(headers);
536        if !self.query.is_empty() {
537            builder = builder.query(self.query.as_slice());
538        }
539        if let Some(timeout) = self.execution_options.request_timeout {
540            builder = builder.timeout(timeout);
541        }
542        if let Some(streaming_body) = self.streaming_body.as_ref() {
543            let body = Self::await_pre_send_future(
544                async { Ok(streaming_body.to_reqwest_body().await) },
545                self.execution_options.write_timeout,
546                self.execution_options.cancellation_token.clone(),
547                &method,
548                Some(&request_url),
549                "Request cancelled while preparing streaming request body",
550                format!(
551                    "Write timeout after {:?} while preparing streaming request body",
552                    self.execution_options.write_timeout
553                ),
554            )
555            .await?;
556            builder = builder.body(body);
557        } else {
558            builder = Self::apply_request_body(builder, self.take_body());
559        }
560
561        let send_future =
562            tokio::time::timeout(self.execution_options.write_timeout, builder.send());
563        let next = if let Some(token) = self.execution_options.cancellation_token.as_ref() {
564            tokio::select! {
565                _ = token.cancelled() => {
566                    return Err(HttpError::cancelled("Request cancelled while sending")
567                        .with_method(&method)
568                        .with_url(&request_url));
569                }
570                send_result = send_future => send_result,
571            }
572        } else {
573            send_future.await
574        };
575
576        match next {
577            Ok(Ok(response)) => Ok(response),
578            Ok(Err(error)) => Err(map_reqwest_error(
579                error,
580                HttpErrorKind::Transport,
581                Some(ReqwestErrorPhase::Send),
582                Some(method.clone()),
583                Some(request_url.clone()),
584            )),
585            Err(_) => Err(HttpError::write_timeout(format!(
586                "Write timeout after {:?} while sending request",
587                self.execution_options.write_timeout
588            ))
589            .with_method(&method)
590            .with_url(&request_url)),
591        }
592    }
593
594    /// Waits for one asynchronous pre-send preparation step with cancellation and
595    /// write-timeout handling.
596    ///
597    /// # Parameters
598    /// - `future`: Preparation future, such as async header injection or streaming
599    ///   body factory execution.
600    /// - `write_timeout`: Timeout budget reused for send preparation.
601    /// - `cancellation_token`: Optional request cancellation token.
602    /// - `method`: Request method for error context.
603    /// - `request_url`: Optional resolved request URL for error context.
604    /// - `cancellation_message`: Message used when cancellation wins.
605    /// - `timeout_message`: Message used when timeout wins.
606    ///
607    /// # Returns
608    /// The future output when it completes before cancellation or timeout.
609    ///
610    /// # Errors
611    /// Returns [`HttpErrorKind::Cancelled`] on cancellation,
612    /// [`HttpErrorKind::WriteTimeout`] on timeout, or propagates the future's own
613    /// error.
614    async fn await_pre_send_future<T, F>(
615        future: F,
616        write_timeout: Duration,
617        cancellation_token: Option<CancellationToken>,
618        method: &Method,
619        request_url: Option<&Url>,
620        cancellation_message: &str,
621        timeout_message: String,
622    ) -> HttpResult<T>
623    where
624        F: Future<Output = HttpResult<T>>,
625    {
626        let timed = tokio::time::timeout(write_timeout, future);
627        let next = if let Some(token) = cancellation_token.as_ref() {
628            tokio::select! {
629                _ = token.cancelled() => {
630                    return Err(Self::pre_send_cancelled_error(
631                        cancellation_message,
632                        method,
633                        request_url,
634                    ));
635                }
636                result = timed => result,
637            }
638        } else {
639            timed.await
640        };
641
642        match next {
643            Ok(result) => result,
644            Err(_) => Err(Self::pre_send_write_timeout_error(
645                timeout_message,
646                method,
647                request_url,
648            )),
649        }
650    }
651
652    /// Builds a cancellation error for pre-send preparation.
653    ///
654    /// # Parameters
655    /// - `message`: Cancellation message.
656    /// - `method`: Request method for context.
657    /// - `request_url`: Optional request URL for context.
658    ///
659    /// # Returns
660    /// Cancellation [`HttpError`] with request context attached.
661    fn pre_send_cancelled_error(
662        message: &str,
663        method: &Method,
664        request_url: Option<&Url>,
665    ) -> HttpError {
666        let mut error = HttpError::cancelled(message).with_method(method);
667        if let Some(request_url) = request_url {
668            error = error.with_url(request_url);
669        }
670        error
671    }
672
673    /// Builds a write-timeout error for pre-send preparation.
674    ///
675    /// # Parameters
676    /// - `message`: Timeout message.
677    /// - `method`: Request method for context.
678    /// - `request_url`: Optional request URL for context.
679    ///
680    /// # Returns
681    /// Write-timeout [`HttpError`] with request context attached.
682    fn pre_send_write_timeout_error(
683        message: String,
684        method: &Method,
685        request_url: Option<&Url>,
686    ) -> HttpError {
687        let mut error = HttpError::write_timeout(message).with_method(method);
688        if let Some(request_url) = request_url {
689            error = error.with_url(request_url);
690        }
691        error
692    }
693
694    /// Returns the resolved URL for current request fields, computing and
695    /// caching it on demand.
696    ///
697    /// # Returns
698    /// Resolved [`Url`] value (cloned from cache when already computed).
699    ///
700    /// # Errors
701    /// Returns [`HttpError::invalid_url`] when parsing fails, the base URL is
702    /// missing for a relative path, joining fails, or [`Self::ipv4_only`]
703    /// rejects an IPv6 literal host.
704    pub(crate) fn resolved_url(&self) -> Result<Url, HttpError> {
705        let cached = match self.resolved_url.read() {
706            Ok(guard) => guard.clone(),
707            Err(_) => return Err(HttpError::other("Resolved URL cache read lock poisoned")),
708        };
709        if let Some(url) = cached.as_ref() {
710            return Ok(url.clone());
711        }
712        let resolved = self.compute_resolved_url()?;
713        match self.resolved_url.write() {
714            Ok(mut guard) => *guard = Some(resolved.clone()),
715            Err(_) => return Err(HttpError::other("Resolved URL cache write lock poisoned")),
716        }
717        Ok(resolved)
718    }
719
720    /// Returns the resolved URL plus request-builder query parameters.
721    ///
722    /// This mirrors the URL sent by [`Self::send_impl`]: query pairs already
723    /// present in [`Self::path`] are preserved, and pairs from
724    /// [`Self::query`] are appended in insertion order.
725    ///
726    /// # Returns
727    /// Resolved [`Url`] including query parameters from this request snapshot.
728    ///
729    /// # Errors
730    /// Propagates [`Self::resolved_url`] errors for invalid or unresolved URLs.
731    pub(crate) fn resolved_url_with_query(&self) -> Result<Url, HttpError> {
732        let mut url = self.resolved_url()?;
733        if !self.query.is_empty() {
734            {
735                let mut pairs = url.query_pairs_mut();
736                for (key, value) in &self.query {
737                    pairs.append_pair(key, value);
738                }
739            }
740        }
741        Ok(url)
742    }
743
744    /// Returns cached resolved URL when available.
745    pub fn resolved_url_cached(&self) -> Option<Url> {
746        self.resolved_url
747            .read()
748            .map(|guard| guard.clone())
749            .unwrap_or_default()
750    }
751
752    /// Recomputes and stores the current resolved URL.
753    fn refresh_resolved_url_cache(&mut self) {
754        if let Ok(mut guard) = self.resolved_url.write() {
755            *guard = self.compute_resolved_url().ok();
756        }
757    }
758
759    /// Computes the resolved URL from current path/base/ipv4 settings.
760    fn compute_resolved_url(&self) -> Result<Url, HttpError> {
761        if let Ok(url) = Url::parse(&self.path) {
762            self.validate_resolved_url_host(&url)?;
763            return Ok(url);
764        }
765
766        let base = self.context.base_url.as_ref().ok_or_else(|| {
767            HttpError::invalid_url(format!(
768                "Cannot resolve relative path '{}' without base_url",
769                self.path
770            ))
771        })?;
772
773        let url = base.join(&self.path).map_err(|error| {
774            HttpError::invalid_url(format!(
775                "Failed to resolve path '{}' against base URL '{}': {}",
776                self.path, base, error
777            ))
778        })?;
779        self.validate_resolved_url_host(&url)?;
780        Ok(url)
781    }
782
783    /// Enforces [`Self::ipv4_only`] by rejecting IPv6 literal hosts in `url`.
784    ///
785    /// # Parameters
786    /// - `url`: Candidate URL after parsing or joining.
787    ///
788    /// # Returns
789    /// `Ok(())` when the host is acceptable.
790    ///
791    /// # Errors
792    /// [`HttpError::invalid_url`] when `ipv4_only` is `true` and the host is an
793    /// IPv6 literal.
794    fn validate_resolved_url_host(&self, url: &Url) -> Result<(), HttpError> {
795        if self.context.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
796            return Err(HttpError::invalid_url(format!(
797                "IPv6 literal host is not allowed when ipv4_only=true: {}",
798                url
799            )));
800        }
801        Ok(())
802    }
803
804    /// Returns the attempt-scoped merged outbound headers.
805    ///
806    /// On first call after invalidation, this computes merged headers by
807    /// replaying defaults/injectors/request-local headers and stores them in
808    /// [`Self::effective_headers`]. Later calls in the same attempt return the
809    /// cached map.
810    ///
811    /// Why this API is async:
812    /// - async injectors are part of header assembly and may perform awaitable
813    ///   work (for example token refresh or other I/O-backed value resolution).
814    /// - therefore header materialization cannot be fully synchronous.
815    ///
816    /// Merge order (later wins on duplicates):
817    /// 1. Client default headers snapshot captured when the builder was
818    ///    created.
819    /// 2. Synchronous injector output in registration order.
820    /// 3. Asynchronous injector output in registration order.
821    /// 4. Request-local headers from this snapshot.
822    ///
823    /// # Returns
824    /// Borrowed merged [`HeaderMap`] from the cache.
825    ///
826    /// # Errors
827    /// Propagates failures returned by any injector's `apply` implementation.
828    pub(crate) async fn effective_headers(&mut self) -> HttpResult<&HeaderMap> {
829        if self.effective_headers.is_none() {
830            self.effective_headers = Some(self.compute_effective_headers().await?);
831        }
832        Ok(self
833            .effective_headers
834            .as_ref()
835            .expect("effective headers cache must be populated after computation"))
836    }
837
838    /// Returns cached merged outbound headers when available.
839    pub(crate) fn effective_headers_cached(&self) -> Option<&HeaderMap> {
840        self.effective_headers.as_ref()
841    }
842
843    /// Clears the effective-header cache.
844    ///
845    /// This method invalidates [`Self::effective_headers`] so the next call to
846    /// [`Self::effective_headers`] recomputes merged headers by re-running
847    /// defaults and header injectors.
848    ///
849    /// Why this is needed:
850    /// - request-local headers may have been mutated (`set_header`, `clear_headers`, etc.);
851    /// - injector output may be time-sensitive (for example rotating auth token
852    ///   or timestamp-based signatures), so each send attempt should recompute
853    ///   merged headers instead of reusing stale values from prior attempts.
854    ///
855    /// When to call:
856    /// - immediately before starting a new send attempt;
857    /// - after any mutation that can change final outbound headers.
858    pub(crate) fn invalidate_effective_headers_cache(&mut self) {
859        self.effective_headers = None;
860    }
861
862    /// Computes merged outbound headers without touching the cache.
863    async fn compute_effective_headers(&self) -> HttpResult<HeaderMap> {
864        let mut headers = self.context.default_headers.clone();
865
866        for injector in &self.context.injectors {
867            injector.apply(&mut headers)?;
868        }
869        for injector in &self.context.async_injectors {
870            injector.apply(&mut headers).await?;
871        }
872
873        headers.extend(self.headers.clone());
874        Ok(headers)
875    }
876
877    /// Returns a pre-cancelled [`HttpError`] when a token is present and
878    /// already cancelled.
879    ///
880    /// # Parameters
881    /// - `message`: Human-readable cancellation reason.
882    ///
883    /// # Returns
884    /// `Some` [`HttpError`] (including method context and cached URL when
885    /// available) when a token exists and is already cancelled; otherwise
886    /// `None`.
887    pub(crate) fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
888        if self
889            .execution_options
890            .cancellation_token
891            .as_ref()
892            .is_some_and(CancellationToken::is_cancelled)
893        {
894            let mut error = HttpError::cancelled(message.to_string()).with_method(&self.method);
895            if let Ok(url) = self.resolved_url_with_query() {
896                error = error.with_url(&url);
897            }
898            Some(error)
899        } else {
900            None
901        }
902    }
903
904    /// Attaches the correct reqwest body encoding for each [`HttpRequestBody`]
905    /// variant.
906    ///
907    /// # Parameters
908    /// - `builder`: Partially configured [`reqwest::RequestBuilder`]
909    ///   (method/URL/headers already set).
910    /// - `body`: Payload variant to attach; moved into the builder.
911    ///
912    /// # Returns
913    /// The same builder with an appropriate `.body(...)` applied (or unchanged
914    /// for [`HttpRequestBody::Empty`]).
915    fn apply_request_body(
916        builder: reqwest::RequestBuilder,
917        body: HttpRequestBody,
918    ) -> reqwest::RequestBuilder {
919        match body {
920            HttpRequestBody::Empty => builder,
921            HttpRequestBody::Bytes(bytes)
922            | HttpRequestBody::Json(bytes)
923            | HttpRequestBody::Form(bytes)
924            | HttpRequestBody::Multipart(bytes)
925            | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
926            HttpRequestBody::Stream(chunks) => {
927                let body_stream = futures_stream::iter(
928                    chunks.into_iter().map(Result::<Bytes, std::io::Error>::Ok),
929                );
930                builder.body(reqwest::Body::wrap_stream(body_stream))
931            }
932            HttpRequestBody::Text(text) => builder.body(text),
933        }
934    }
935}
936
937/// Exercises request cache refresh branches for coverage-only tests.
938///
939/// # Returns
940/// Resolved URL and effective header count diagnostics.
941#[cfg(coverage)]
942#[doc(hidden)]
943pub(crate) async fn coverage_exercise_request_cache_paths() -> Vec<String> {
944    let client = crate::HttpClientFactory::new()
945        .create_default()
946        .expect("coverage HTTP client should build");
947    let mut request = client
948        .request(Method::GET, "https://example.com/coverage-cache")
949        .build();
950    *request
951        .resolved_url
952        .write()
953        .expect("resolved URL cache lock should not be poisoned") = None;
954    let url = request
955        .resolved_url()
956        .expect("coverage request URL should resolve");
957    let header_count = request
958        .effective_headers()
959        .await
960        .expect("coverage headers should compute")
961        .len();
962    let cached_header_count = request
963        .effective_headers()
964        .await
965        .expect("coverage headers should be cached")
966        .len();
967    vec![
968        url.to_string(),
969        header_count.to_string(),
970        cached_header_count.to_string(),
971    ]
972}
973
974impl Clone for HttpRequest {
975    fn clone(&self) -> Self {
976        Self {
977            method: self.method.clone(),
978            path: self.path.clone(),
979            query: self.query.clone(),
980            headers: self.headers.clone(),
981            body: self.body.clone(),
982            streaming_body: self.streaming_body.clone(),
983            resolved_url: RwLock::new(self.resolved_url_cached()),
984            effective_headers: self.effective_headers.clone(),
985            execution_options: self.execution_options.clone(),
986            context: self.context.clone(),
987        }
988    }
989}