Skip to main content

qubit_http/request/
http_request.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Immutable HTTP request object.
11
12use std::future::Future;
13use std::sync::RwLock;
14use std::time::Duration;
15
16use bytes::Bytes;
17use futures_util::stream as futures_stream;
18use http::{
19    HeaderMap,
20    HeaderName,
21    HeaderValue,
22    Method,
23};
24use qubit_function::MutatingFunction;
25use reqwest::Response;
26use tokio_util::sync::CancellationToken;
27use url::Host;
28use url::Url;
29
30use crate::error::{
31    backend_error_mapper::map_reqwest_error,
32    ReqwestErrorPhase,
33};
34use crate::{
35    AsyncHttpHeaderInjector,
36    HttpError,
37    HttpErrorKind,
38    HttpHeaderInjector,
39    HttpLogger,
40    HttpRequestStreamingBody,
41    HttpResult,
42};
43
44use super::http_request_body::HttpRequestBody;
45use super::http_request_builder::HttpRequestBuilder;
46use super::http_request_retry_override::HttpRequestRetryOverride;
47use super::parse_header;
48
49/// Request execution options (timeouts, cancellation, and retry override).
50#[derive(Debug, Clone)]
51struct HttpRequestExecutionOptions {
52    /// Overrides client-wide request timeout when set; otherwise client default applies.
53    request_timeout: Option<Duration>,
54    /// Per-request write timeout used during request sending.
55    write_timeout: Duration,
56    /// Per-request read timeout used during response body reads.
57    read_timeout: Duration,
58    /// Optional cancellation token checked before send and during I/O phases.
59    cancellation_token: Option<CancellationToken>,
60    /// Per-request retry override (enable/disable/method-policy/Retry-After behavior).
61    retry_override: HttpRequestRetryOverride,
62}
63
64/// Request context captured from the originating client.
65#[derive(Debug, Clone)]
66struct HttpRequestContext {
67    /// Base URL copied from client options, used to resolve relative `path`.
68    base_url: Option<Url>,
69    /// Whether resolved URLs must avoid IPv6 literal hosts.
70    ipv4_only: bool,
71    /// Client default headers snapshot captured when this request builder was created.
72    default_headers: HeaderMap,
73    /// Client sync header injectors snapshot captured when this request builder was created.
74    injectors: Vec<HttpHeaderInjector>,
75    /// Client async header injectors snapshot captured when this request builder was created.
76    async_injectors: Vec<AsyncHttpHeaderInjector>,
77}
78
79/// Immutable snapshot of a single HTTP call produced by
80/// [`crate::HttpRequestBuilder`].
81#[derive(Debug)]
82pub struct HttpRequest {
83    /// HTTP method (GET, POST, …).
84    method: Method,
85    /// Absolute URL string, or path joined with client `base_url` when not
86    /// parseable as URL.
87    path: String,
88    /// Query string parameters as `(name, value)` pairs.
89    query: Vec<(String, String)>,
90    /// Headers added on top of client defaults and injector output.
91    headers: HeaderMap,
92    /// Serialized body variant.
93    body: HttpRequestBody,
94    /// Deferred per-attempt streaming body factory.
95    streaming_body: Option<HttpRequestStreamingBody>,
96    /// Lazily maintained cache for the currently resolved URL.
97    resolved_url: RwLock<Option<Url>>,
98    /// Attempt-scoped cache of merged outbound headers after applying
99    /// defaults/injectors/request-local headers.
100    effective_headers: Option<HeaderMap>,
101    /// Request execution options and runtime controls.
102    execution_options: HttpRequestExecutionOptions,
103    /// Client-derived context for URL and header resolution.
104    context: HttpRequestContext,
105}
106
107impl HttpRequest {
108    /// Consumes a finished [`HttpRequestBuilder`] and freezes its fields into
109    /// an [`HttpRequest`].
110    ///
111    /// # Parameters
112    /// - `builder`: Populated builder produced by the HTTP client pipeline.
113    ///
114    /// # Returns
115    /// Snapshot ready for URL resolution, header assembly, and sending.
116    pub(super) fn new(builder: HttpRequestBuilder) -> Self {
117        let mut request = Self {
118            method: builder.method,
119            path: builder.path,
120            query: builder.query,
121            headers: builder.headers,
122            body: builder.body,
123            streaming_body: builder.streaming_body,
124            resolved_url: RwLock::new(None),
125            effective_headers: None,
126            execution_options: HttpRequestExecutionOptions {
127                request_timeout: builder.request_timeout,
128                write_timeout: builder.write_timeout,
129                read_timeout: builder.read_timeout,
130                cancellation_token: builder.cancellation_token,
131                retry_override: builder.retry_override,
132            },
133            context: HttpRequestContext {
134                base_url: builder.base_url,
135                ipv4_only: builder.ipv4_only,
136                default_headers: builder.default_headers,
137                injectors: builder.injectors,
138                async_injectors: builder.async_injectors,
139            },
140        };
141        request.refresh_resolved_url_cache();
142        request
143    }
144
145    /// Returns the HTTP verb for this snapshot.
146    ///
147    /// # Returns
148    /// Borrowed [`Method`] (for example GET or POST).
149    pub fn method(&self) -> &Method {
150        &self.method
151    }
152
153    /// Replaces the HTTP verb.
154    ///
155    /// # Parameters
156    /// - `method`: New [`Method`].
157    ///
158    /// # Returns
159    /// `self` for method chaining.
160    pub fn set_method(&mut self, method: Method) -> &mut Self {
161        self.method = method;
162        self
163    }
164
165    /// Returns the path segment or absolute URL string stored on this request.
166    ///
167    /// # Returns
168    /// The raw path/URL before query string assembly; may be relative if a base
169    /// URL is set.
170    pub fn path(&self) -> &str {
171        &self.path
172    }
173
174    /// Replaces the path or absolute URL string.
175    ///
176    /// # Parameters
177    /// - `path`: New path or URL string (query string is managed separately via
178    ///   [`Self::add_query_param`]).
179    ///
180    /// # Returns
181    /// `self` for method chaining.
182    pub fn set_path(&mut self, path: &str) -> &mut Self {
183        self.path = path.to_string();
184        self.refresh_resolved_url_cache();
185        self
186    }
187
188    /// Returns ordered `(name, value)` query pairs that will be appended to the
189    /// resolved URL.
190    ///
191    /// # Returns
192    /// Slice view of accumulated query parameters.
193    pub fn query(&self) -> &[(String, String)] {
194        &self.query
195    }
196
197    /// Appends a single query pair preserving insertion order.
198    ///
199    /// # Parameters
200    /// - `key`: Parameter name.
201    /// - `value`: Parameter value.
202    ///
203    /// # Returns
204    /// `self` for method chaining.
205    pub fn add_query_param(&mut self, key: &str, value: &str) -> &mut Self {
206        self.query.push((key.to_string(), value.to_string()));
207        self
208    }
209
210    /// Removes every query pair from this snapshot.
211    ///
212    /// # Returns
213    /// `self` for method chaining.
214    pub fn clear_query_params(&mut self) -> &mut Self {
215        self.query.clear();
216        self
217    }
218
219    /// Returns request-local headers layered on top of client defaults and
220    /// injector output at send time.
221    ///
222    /// # Returns
223    /// Borrowed [`HeaderMap`] owned by this request only (not merged defaults).
224    pub fn headers(&self) -> &HeaderMap {
225        &self.headers
226    }
227
228    /// Parses and inserts one header from string name/value pairs.
229    ///
230    /// # Parameters
231    /// - `name`: Header field name.
232    /// - `value`: Header field value.
233    ///
234    /// # Returns
235    /// `Ok(self)` on success.
236    ///
237    /// # Errors
238    /// Returns [`HttpError`] when name or value cannot be converted into valid
239    /// HTTP tokens.
240    pub fn set_header(&mut self, name: &str, value: &str) -> Result<&mut Self, HttpError> {
241        let (header_name, header_value) = parse_header(name, value)?;
242        self.headers.insert(header_name, header_value);
243        self.invalidate_effective_headers_cache();
244        Ok(self)
245    }
246
247    /// Inserts one header using pre-validated [`HeaderName`] / [`HeaderValue`]
248    /// types.
249    ///
250    /// # Parameters
251    /// - `name`: Typed header name.
252    /// - `value`: Typed header value.
253    ///
254    /// # Returns
255    /// `self` for method chaining.
256    pub fn set_typed_header(&mut self, name: HeaderName, value: HeaderValue) -> &mut Self {
257        self.headers.insert(name, value);
258        self.invalidate_effective_headers_cache();
259        self
260    }
261
262    /// Removes all values for a header field by typed name.
263    ///
264    /// # Parameters
265    /// - `name`: Header name to strip from the request-local map.
266    ///
267    /// # Returns
268    /// `self` for method chaining.
269    pub fn remove_header(&mut self, name: &HeaderName) -> &mut Self {
270        self.headers.remove(name);
271        self.invalidate_effective_headers_cache();
272        self
273    }
274
275    /// Clears all request-local headers (defaults and injectors are unaffected
276    /// until send).
277    ///
278    /// # Returns
279    /// `self` for method chaining.
280    pub fn clear_headers(&mut self) -> &mut Self {
281        self.headers.clear();
282        self.invalidate_effective_headers_cache();
283        self
284    }
285
286    /// Returns the serialized body variant for this snapshot.
287    ///
288    /// # Returns
289    /// Borrowed [`HttpRequestBody`].
290    pub fn body(&self) -> &HttpRequestBody {
291        &self.body
292    }
293
294    /// Replaces the entire body payload.
295    ///
296    /// # Parameters
297    /// - `body`: New [`HttpRequestBody`] variant.
298    ///
299    /// # Returns
300    /// `self` for method chaining.
301    pub fn set_body(&mut self, body: HttpRequestBody) -> &mut Self {
302        self.body = body;
303        self.streaming_body = None;
304        self
305    }
306
307    /// Sets deferred streaming upload body factory for this request.
308    ///
309    /// # Parameters
310    /// - `streaming_body`: Deferred body stream factory reused across retries.
311    ///
312    /// # Returns
313    /// `self` for method chaining.
314    pub fn set_streaming_body(&mut self, streaming_body: HttpRequestStreamingBody) -> &mut Self {
315        self.streaming_body = Some(streaming_body);
316        self.body = HttpRequestBody::Empty;
317        self
318    }
319
320    /// Returns the per-request total timeout, if any.
321    ///
322    /// # Returns
323    /// `Some(duration)` when a request-specific timeout overrides the client
324    /// default; otherwise `None`.
325    pub fn request_timeout(&self) -> Option<Duration> {
326        self.execution_options.request_timeout
327    }
328
329    /// Sets a per-request total timeout that overrides the client default for
330    /// this send.
331    ///
332    /// # Parameters
333    /// - `timeout`: Upper bound for the entire request lifecycle handled by
334    ///   reqwest.
335    ///
336    /// # Returns
337    /// `self` for method chaining.
338    pub fn set_request_timeout(&mut self, timeout: Duration) -> &mut Self {
339        self.execution_options.request_timeout = Some(timeout);
340        self
341    }
342
343    /// Drops the per-request timeout so the client-wide default applies again.
344    ///
345    /// # Returns
346    /// `self` for method chaining.
347    pub fn clear_request_timeout(&mut self) -> &mut Self {
348        self.execution_options.request_timeout = None;
349        self
350    }
351
352    /// Returns the write-phase timeout used while sending the request.
353    pub fn write_timeout(&self) -> Duration {
354        self.execution_options.write_timeout
355    }
356
357    /// Sets the write-phase timeout used while sending the request.
358    pub fn set_write_timeout(&mut self, timeout: Duration) -> &mut Self {
359        self.execution_options.write_timeout = timeout;
360        self
361    }
362
363    /// Returns the read-phase timeout used while reading response body bytes.
364    pub fn read_timeout(&self) -> Duration {
365        self.execution_options.read_timeout
366    }
367
368    /// Sets the read-phase timeout used while reading response body bytes.
369    pub fn set_read_timeout(&mut self, timeout: Duration) -> &mut Self {
370        self.execution_options.read_timeout = timeout;
371        self
372    }
373
374    /// Returns the optional base URL used to resolve relative [`Self::path`]
375    /// values.
376    ///
377    /// # Returns
378    /// `Some` when a base is configured; `None` when only absolute URLs in
379    /// `path` are valid.
380    pub fn base_url(&self) -> Option<&Url> {
381        self.context.base_url.as_ref()
382    }
383
384    /// Sets the base URL used for internal URL resolution when `path` is not
385    /// absolute.
386    ///
387    /// # Parameters
388    /// - `base_url`: Root URL to join against relative paths.
389    ///
390    /// # Returns
391    /// `self` for method chaining.
392    pub fn set_base_url(&mut self, base_url: Url) -> &mut Self {
393        self.context.base_url = Some(base_url);
394        self.refresh_resolved_url_cache();
395        self
396    }
397
398    /// Removes the configured base URL so relative paths can no longer be
399    /// resolved without resetting it.
400    ///
401    /// # Returns
402    /// `self` for method chaining.
403    pub fn clear_base_url(&mut self) -> &mut Self {
404        self.context.base_url = None;
405        self.refresh_resolved_url_cache();
406        self
407    }
408
409    /// Returns whether IPv6 literal hosts are rejected after URL resolution.
410    ///
411    /// # Returns
412    /// `true` when a resolved URL whose host is an IPv6 literal must be
413    /// rejected with [`HttpError::invalid_url`].
414    pub fn ipv4_only(&self) -> bool {
415        self.context.ipv4_only
416    }
417
418    /// Enables or disables IPv6 literal host rejection for resolved URLs.
419    ///
420    /// # Parameters
421    /// - `enabled`: When `true`, resolved URLs whose host is an IPv6 literal
422    ///   are errors.
423    ///
424    /// # Returns
425    /// `self` for method chaining.
426    pub fn set_ipv4_only(&mut self, enabled: bool) -> &mut Self {
427        self.context.ipv4_only = enabled;
428        self.refresh_resolved_url_cache();
429        self
430    }
431
432    /// Returns the cooperative cancellation handle, if configured.
433    ///
434    /// # Returns
435    /// `Some` token checked before send and during I/O; `None` when
436    /// cancellation is not wired.
437    pub fn cancellation_token(&self) -> Option<&CancellationToken> {
438        self.execution_options.cancellation_token.as_ref()
439    }
440
441    /// Attaches a [`CancellationToken`] that can abort this request
442    /// cooperatively.
443    ///
444    /// # Parameters
445    /// - `token`: Shared cancellation source.
446    ///
447    /// # Returns
448    /// `self` for method chaining.
449    pub fn set_cancellation_token(&mut self, token: CancellationToken) -> &mut Self {
450        self.execution_options.cancellation_token = Some(token);
451        self
452    }
453
454    /// Removes any cancellation token from this snapshot.
455    ///
456    /// # Returns
457    /// `self` for method chaining.
458    pub fn clear_cancellation_token(&mut self) -> &mut Self {
459        self.execution_options.cancellation_token = None;
460        self
461    }
462
463    /// Returns the per-request retry override applied by the client pipeline.
464    ///
465    /// # Returns
466    /// Borrowed [`HttpRequestRetryOverride`].
467    pub fn retry_override(&self) -> &HttpRequestRetryOverride {
468        &self.execution_options.retry_override
469    }
470
471    /// Replaces the retry override for this single request.
472    ///
473    /// # Parameters
474    /// - `retry_override`: New override policy and knobs.
475    ///
476    /// # Returns
477    /// `self` for method chaining.
478    pub fn set_retry_override(&mut self, retry_override: HttpRequestRetryOverride) -> &mut Self {
479        self.execution_options.retry_override = retry_override;
480        self
481    }
482
483    /// Moves the current body out, leaving [`HttpRequestBody::Empty`] in its
484    /// place.
485    ///
486    /// Used internally before handing the payload to reqwest so the snapshot is
487    /// not cloned twice.
488    ///
489    /// # Returns
490    /// Previous [`HttpRequestBody`] value.
491    pub(crate) fn take_body(&mut self) -> HttpRequestBody {
492        std::mem::replace(&mut self.body, HttpRequestBody::Empty)
493    }
494
495    /// Assembles a reqwest [`RequestBuilder`](reqwest::RequestBuilder), applies
496    /// this snapshot's body, then sends with a bounded write phase.
497    ///
498    /// Centralizes send-attempt preparation and transport wiring:
499    /// - invalidates and recomputes effective headers for each attempt;
500    /// - emits request TRACE logs via the provided logger;
501    /// - applies query/timeout/body wiring plus cooperative cancellation and
502    ///   write-timeout handling.
503    ///
504    /// # Parameters
505    /// - `backend`: Shared reqwest client.
506    /// - `logger`: Attempt-scoped request logger.
507    ///
508    /// # Returns
509    /// The successful [`Response`] or a mapped [`HttpError`].
510    ///
511    /// # Errors
512    /// - Cooperative cancellation while waiting on the send future.
513    /// - Transport failures mapped from reqwest.
514    /// - Write timeout when the send future does not complete within
515    ///   `write_timeout`.
516    pub(crate) async fn send_impl(
517        &mut self,
518        backend: &reqwest::Client,
519        logger: &HttpLogger<'_>,
520    ) -> HttpResult<Response> {
521        // Effective headers are cached on the request. Each send attempt must
522        // invalidate and recompute them so injector output and request mutations
523        // are refreshed instead of reusing stale headers from prior attempts.
524        self.invalidate_effective_headers_cache();
525        let method = self.method().clone();
526        let request_url_context = self.resolved_url_with_query().ok();
527        let write_timeout = self.execution_options.write_timeout;
528        let cancellation_token = self.execution_options.cancellation_token.clone();
529        let headers = Self::await_pre_send_future(
530            self.effective_headers(),
531            write_timeout,
532            cancellation_token,
533            &method,
534            request_url_context.as_ref(),
535            "Request cancelled while preparing request",
536            format!(
537                "Write timeout after {:?} while preparing request",
538                write_timeout
539            ),
540        )
541        .await?
542        .clone();
543        let url = self.resolved_url()?;
544        let request_url = self.resolved_url_with_query()?;
545        // Log the request after computing effective headers so TRACE logs
546        // include the same query string used by the actual send path.
547        logger.log_request(self);
548        let mut builder = backend.request(method.clone(), url.clone());
549        builder = builder.headers(headers);
550        if !self.query.is_empty() {
551            builder = builder.query(self.query.as_slice());
552        }
553        if let Some(timeout) = self.execution_options.request_timeout {
554            builder = builder.timeout(timeout);
555        }
556        if let Some(streaming_body) = self.streaming_body.as_ref() {
557            let body = Self::await_pre_send_future(
558                async { Ok(streaming_body.to_reqwest_body().await) },
559                self.execution_options.write_timeout,
560                self.execution_options.cancellation_token.clone(),
561                &method,
562                Some(&request_url),
563                "Request cancelled while preparing streaming request body",
564                format!(
565                    "Write timeout after {:?} while preparing streaming request body",
566                    self.execution_options.write_timeout
567                ),
568            )
569            .await?;
570            builder = builder.body(body);
571        } else {
572            builder = Self::apply_request_body(builder, self.take_body());
573        }
574
575        let send_future =
576            tokio::time::timeout(self.execution_options.write_timeout, builder.send());
577        let next = if let Some(token) = self.execution_options.cancellation_token.as_ref() {
578            tokio::select! {
579                _ = token.cancelled() => {
580                    return Err(HttpError::cancelled("Request cancelled while sending")
581                        .with_method(&method)
582                        .with_url(&request_url));
583                }
584                send_result = send_future => send_result,
585            }
586        } else {
587            send_future.await
588        };
589
590        match next {
591            Ok(Ok(response)) => Ok(response),
592            Ok(Err(error)) => Err(map_reqwest_error(
593                error,
594                HttpErrorKind::Transport,
595                ReqwestErrorPhase::Send,
596                Some(method.clone()),
597                Some(request_url.clone()),
598            )),
599            Err(_) => Err(HttpError::write_timeout(format!(
600                "Write timeout after {:?} while sending request",
601                self.execution_options.write_timeout
602            ))
603            .with_method(&method)
604            .with_url(&request_url)),
605        }
606    }
607
608    /// Waits for one asynchronous pre-send preparation step with cancellation and
609    /// write-timeout handling.
610    ///
611    /// # Parameters
612    /// - `future`: Preparation future, such as async header injection or streaming
613    ///   body factory execution.
614    /// - `write_timeout`: Timeout budget reused for send preparation.
615    /// - `cancellation_token`: Optional request cancellation token.
616    /// - `method`: Request method for error context.
617    /// - `request_url`: Optional resolved request URL for error context.
618    /// - `cancellation_message`: Message used when cancellation wins.
619    /// - `timeout_message`: Message used when timeout wins.
620    ///
621    /// # Returns
622    /// The future output when it completes before cancellation or timeout.
623    ///
624    /// # Errors
625    /// Returns [`HttpErrorKind::Cancelled`] on cancellation,
626    /// [`HttpErrorKind::WriteTimeout`] on timeout, or propagates the future's own
627    /// error.
628    async fn await_pre_send_future<T, F>(
629        future: F,
630        write_timeout: Duration,
631        cancellation_token: Option<CancellationToken>,
632        method: &Method,
633        request_url: Option<&Url>,
634        cancellation_message: &str,
635        timeout_message: String,
636    ) -> HttpResult<T>
637    where
638        F: Future<Output = HttpResult<T>>,
639    {
640        let timed = tokio::time::timeout(write_timeout, future);
641        let next = if let Some(token) = cancellation_token.as_ref() {
642            tokio::select! {
643                _ = token.cancelled() => {
644                    return Err(Self::pre_send_cancelled_error(
645                        cancellation_message,
646                        method,
647                        request_url,
648                    ));
649                }
650                result = timed => result,
651            }
652        } else {
653            timed.await
654        };
655
656        match next {
657            Ok(result) => result,
658            Err(_) => Err(Self::pre_send_write_timeout_error(
659                timeout_message,
660                method,
661                request_url,
662            )),
663        }
664    }
665
666    /// Builds a cancellation error for pre-send preparation.
667    ///
668    /// # Parameters
669    /// - `message`: Cancellation message.
670    /// - `method`: Request method for context.
671    /// - `request_url`: Optional request URL for context.
672    ///
673    /// # Returns
674    /// Cancellation [`HttpError`] with request context attached.
675    fn pre_send_cancelled_error(
676        message: &str,
677        method: &Method,
678        request_url: Option<&Url>,
679    ) -> HttpError {
680        let mut error = HttpError::cancelled(message).with_method(method);
681        if let Some(request_url) = request_url {
682            error = error.with_url(request_url);
683        }
684        error
685    }
686
687    /// Builds a write-timeout error for pre-send preparation.
688    ///
689    /// # Parameters
690    /// - `message`: Timeout message.
691    /// - `method`: Request method for context.
692    /// - `request_url`: Optional request URL for context.
693    ///
694    /// # Returns
695    /// Write-timeout [`HttpError`] with request context attached.
696    fn pre_send_write_timeout_error(
697        message: String,
698        method: &Method,
699        request_url: Option<&Url>,
700    ) -> HttpError {
701        let mut error = HttpError::write_timeout(message).with_method(method);
702        if let Some(request_url) = request_url {
703            error = error.with_url(request_url);
704        }
705        error
706    }
707
708    /// Returns the resolved URL for current request fields, computing and
709    /// caching it on demand.
710    ///
711    /// # Returns
712    /// Resolved [`Url`] value (cloned from cache when already computed).
713    ///
714    /// # Errors
715    /// Returns [`HttpError::invalid_url`] when parsing fails, the base URL is
716    /// missing for a relative path, joining fails, or [`Self::ipv4_only`]
717    /// rejects an IPv6 literal host.
718    pub(crate) fn resolved_url(&self) -> Result<Url, HttpError> {
719        let cached = match self.resolved_url.read() {
720            Ok(guard) => guard.clone(),
721            Err(_) => return Err(HttpError::other("Resolved URL cache read lock poisoned")),
722        };
723        if let Some(url) = cached.as_ref() {
724            return Ok(url.clone());
725        }
726        let resolved = self.compute_resolved_url()?;
727        match self.resolved_url.write() {
728            Ok(mut guard) => *guard = Some(resolved.clone()),
729            Err(_) => return Err(HttpError::other("Resolved URL cache write lock poisoned")),
730        }
731        Ok(resolved)
732    }
733
734    /// Returns the resolved URL plus request-builder query parameters.
735    ///
736    /// This mirrors the URL sent by [`Self::send_impl`]: query pairs already
737    /// present in [`Self::path`] are preserved, and pairs from
738    /// [`Self::query`] are appended in insertion order.
739    ///
740    /// # Returns
741    /// Resolved [`Url`] including query parameters from this request snapshot.
742    ///
743    /// # Errors
744    /// Propagates [`Self::resolved_url`] errors for invalid or unresolved URLs.
745    pub(crate) fn resolved_url_with_query(&self) -> Result<Url, HttpError> {
746        let mut url = self.resolved_url()?;
747        if !self.query.is_empty() {
748            {
749                let mut pairs = url.query_pairs_mut();
750                for (key, value) in &self.query {
751                    pairs.append_pair(key, value);
752                }
753            }
754        }
755        Ok(url)
756    }
757
758    /// Returns cached resolved URL when available.
759    pub fn resolved_url_cached(&self) -> Option<Url> {
760        self.resolved_url
761            .read()
762            .map(|guard| guard.clone())
763            .unwrap_or_default()
764    }
765
766    /// Recomputes and stores the current resolved URL.
767    fn refresh_resolved_url_cache(&mut self) {
768        if let Ok(mut guard) = self.resolved_url.write() {
769            *guard = self.compute_resolved_url().ok();
770        }
771    }
772
773    /// Computes the resolved URL from current path/base/ipv4 settings.
774    fn compute_resolved_url(&self) -> Result<Url, HttpError> {
775        if let Ok(url) = Url::parse(&self.path) {
776            self.validate_resolved_url_host(&url)?;
777            return Ok(url);
778        }
779
780        let base = self.context.base_url.as_ref().ok_or_else(|| {
781            HttpError::invalid_url(format!(
782                "Cannot resolve relative path '{}' without base_url",
783                self.path
784            ))
785        })?;
786
787        let url = base.join(&self.path).map_err(|error| {
788            HttpError::invalid_url(format!(
789                "Failed to resolve path '{}' against base URL '{}': {}",
790                self.path, base, error
791            ))
792        })?;
793        self.validate_resolved_url_host(&url)?;
794        Ok(url)
795    }
796
797    /// Enforces [`Self::ipv4_only`] by rejecting IPv6 literal hosts in `url`.
798    ///
799    /// # Parameters
800    /// - `url`: Candidate URL after parsing or joining.
801    ///
802    /// # Returns
803    /// `Ok(())` when the host is acceptable.
804    ///
805    /// # Errors
806    /// [`HttpError::invalid_url`] when `ipv4_only` is `true` and the host is an
807    /// IPv6 literal.
808    fn validate_resolved_url_host(&self, url: &Url) -> Result<(), HttpError> {
809        if self.context.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
810            return Err(HttpError::invalid_url(format!(
811                "IPv6 literal host is not allowed when ipv4_only=true: {}",
812                url
813            )));
814        }
815        Ok(())
816    }
817
818    /// Returns the attempt-scoped merged outbound headers.
819    ///
820    /// On first call after invalidation, this computes merged headers by
821    /// replaying defaults/injectors/request-local headers and stores them in
822    /// [`Self::effective_headers`]. Later calls in the same attempt return the
823    /// cached map.
824    ///
825    /// Why this API is async:
826    /// - async injectors are part of header assembly and may perform awaitable
827    ///   work (for example token refresh or other I/O-backed value resolution).
828    /// - therefore header materialization cannot be fully synchronous.
829    ///
830    /// Merge order (later wins on duplicates):
831    /// 1. Client default headers snapshot captured when the builder was
832    ///    created.
833    /// 2. Synchronous injector output in registration order.
834    /// 3. Asynchronous injector output in registration order.
835    /// 4. Request-local headers from this snapshot.
836    ///
837    /// # Returns
838    /// Borrowed merged [`HeaderMap`] from the cache.
839    ///
840    /// # Errors
841    /// Propagates failures returned by any injector's `apply` implementation.
842    pub(crate) async fn effective_headers(&mut self) -> HttpResult<&HeaderMap> {
843        if self.effective_headers.is_none() {
844            self.effective_headers = Some(self.compute_effective_headers().await?);
845        }
846        Ok(self
847            .effective_headers
848            .as_ref()
849            .expect("effective headers cache must be populated after computation"))
850    }
851
852    /// Returns cached merged outbound headers when available.
853    pub(crate) fn effective_headers_cached(&self) -> Option<&HeaderMap> {
854        self.effective_headers.as_ref()
855    }
856
857    /// Clears the effective-header cache.
858    ///
859    /// This method invalidates [`Self::effective_headers`] so the next call to
860    /// [`Self::effective_headers`] recomputes merged headers by re-running
861    /// defaults and header injectors.
862    ///
863    /// Why this is needed:
864    /// - request-local headers may have been mutated (`set_header`, `clear_headers`, etc.);
865    /// - injector output may be time-sensitive (for example rotating auth token
866    ///   or timestamp-based signatures), so each send attempt should recompute
867    ///   merged headers instead of reusing stale values from prior attempts.
868    ///
869    /// When to call:
870    /// - immediately before starting a new send attempt;
871    /// - after any mutation that can change final outbound headers.
872    pub(crate) fn invalidate_effective_headers_cache(&mut self) {
873        self.effective_headers = None;
874    }
875
876    /// Computes merged outbound headers without touching the cache.
877    async fn compute_effective_headers(&self) -> HttpResult<HeaderMap> {
878        let mut headers = self.context.default_headers.clone();
879
880        for injector in &self.context.injectors {
881            injector.apply(&mut headers)?;
882        }
883        for injector in &self.context.async_injectors {
884            injector.apply(&mut headers).await?;
885        }
886
887        headers.extend(self.headers.clone());
888        Ok(headers)
889    }
890
891    /// Returns a pre-cancelled [`HttpError`] when a token is present and
892    /// already cancelled.
893    ///
894    /// # Parameters
895    /// - `message`: Human-readable cancellation reason.
896    ///
897    /// # Returns
898    /// `Some` [`HttpError`] (including method context and cached URL when
899    /// available) when a token exists and is already cancelled; otherwise
900    /// `None`.
901    pub(crate) fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
902        if self
903            .execution_options
904            .cancellation_token
905            .as_ref()
906            .is_some_and(CancellationToken::is_cancelled)
907        {
908            let mut error = HttpError::cancelled(message.to_string()).with_method(&self.method);
909            if let Ok(url) = self.resolved_url_with_query() {
910                error = error.with_url(&url);
911            }
912            Some(error)
913        } else {
914            None
915        }
916    }
917
918    /// Attaches the correct reqwest body encoding for each [`HttpRequestBody`]
919    /// variant.
920    ///
921    /// # Parameters
922    /// - `builder`: Partially configured [`reqwest::RequestBuilder`]
923    ///   (method/URL/headers already set).
924    /// - `body`: Payload variant to attach; moved into the builder.
925    ///
926    /// # Returns
927    /// The same builder with an appropriate `.body(...)` applied (or unchanged
928    /// for [`HttpRequestBody::Empty`]).
929    fn apply_request_body(
930        builder: reqwest::RequestBuilder,
931        body: HttpRequestBody,
932    ) -> reqwest::RequestBuilder {
933        match body {
934            HttpRequestBody::Empty => builder,
935            HttpRequestBody::Bytes(bytes)
936            | HttpRequestBody::Json(bytes)
937            | HttpRequestBody::Form(bytes)
938            | HttpRequestBody::Multipart(bytes)
939            | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
940            HttpRequestBody::Stream(chunks) => {
941                let body_stream = futures_stream::iter(
942                    chunks.into_iter().map(Result::<Bytes, std::io::Error>::Ok),
943                );
944                builder.body(reqwest::Body::wrap_stream(body_stream))
945            }
946            HttpRequestBody::Text(text) => builder.body(text),
947        }
948    }
949}
950
951impl Clone for HttpRequest {
952    fn clone(&self) -> Self {
953        Self {
954            method: self.method.clone(),
955            path: self.path.clone(),
956            query: self.query.clone(),
957            headers: self.headers.clone(),
958            body: self.body.clone(),
959            streaming_body: self.streaming_body.clone(),
960            resolved_url: RwLock::new(self.resolved_url_cached()),
961            effective_headers: self.effective_headers.clone(),
962            execution_options: self.execution_options.clone(),
963            context: self.context.clone(),
964        }
965    }
966}