qubit_http/request/http_request.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! Immutable HTTP request object.
10
11use std::future::Future;
12use std::sync::RwLock;
13use std::time::Duration;
14
15use bytes::Bytes;
16use futures_util::stream as futures_stream;
17use http::{HeaderMap, HeaderName, HeaderValue, Method};
18use qubit_function::MutatingFunction;
19use reqwest::Response;
20use tokio_util::sync::CancellationToken;
21use url::Host;
22use url::Url;
23
24use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
25use crate::{
26 AsyncHttpHeaderInjector, HttpError, HttpErrorKind, HttpHeaderInjector, HttpLogger,
27 HttpRequestStreamingBody, HttpResult,
28};
29
30use super::http_request_body::HttpRequestBody;
31use super::http_request_builder::HttpRequestBuilder;
32use super::http_request_retry_override::HttpRequestRetryOverride;
33use super::parse_header;
34
35/// Request execution options (timeouts, cancellation, and retry override).
36#[derive(Debug, Clone)]
37struct HttpRequestExecutionOptions {
38 /// Overrides client-wide request timeout when set; otherwise client default applies.
39 request_timeout: Option<Duration>,
40 /// Per-request write timeout used during request sending.
41 write_timeout: Duration,
42 /// Per-request read timeout used during response body reads.
43 read_timeout: Duration,
44 /// Optional cancellation token checked before send and during I/O phases.
45 cancellation_token: Option<CancellationToken>,
46 /// Per-request retry override (enable/disable/method-policy/Retry-After behavior).
47 retry_override: HttpRequestRetryOverride,
48}
49
50/// Request context captured from the originating client.
51#[derive(Debug, Clone)]
52struct HttpRequestContext {
53 /// Base URL copied from client options, used to resolve relative `path`.
54 base_url: Option<Url>,
55 /// Whether resolved URLs must avoid IPv6 literal hosts.
56 ipv4_only: bool,
57 /// Client default headers snapshot captured when this request builder was created.
58 default_headers: HeaderMap,
59 /// Client sync header injectors snapshot captured when this request builder was created.
60 injectors: Vec<HttpHeaderInjector>,
61 /// Client async header injectors snapshot captured when this request builder was created.
62 async_injectors: Vec<AsyncHttpHeaderInjector>,
63}
64
65/// Immutable snapshot of a single HTTP call produced by
66/// [`crate::HttpRequestBuilder`].
67#[derive(Debug)]
68pub struct HttpRequest {
69 /// HTTP method (GET, POST, …).
70 method: Method,
71 /// Absolute URL string, or path joined with client `base_url` when not
72 /// parseable as URL.
73 path: String,
74 /// Query string parameters as `(name, value)` pairs.
75 query: Vec<(String, String)>,
76 /// Headers added on top of client defaults and injector output.
77 headers: HeaderMap,
78 /// Serialized body variant.
79 body: HttpRequestBody,
80 /// Deferred per-attempt streaming body factory.
81 streaming_body: Option<HttpRequestStreamingBody>,
82 /// Lazily maintained cache for the currently resolved URL.
83 resolved_url: RwLock<Option<Url>>,
84 /// Attempt-scoped cache of merged outbound headers after applying
85 /// defaults/injectors/request-local headers.
86 effective_headers: Option<HeaderMap>,
87 /// Request execution options and runtime controls.
88 execution_options: HttpRequestExecutionOptions,
89 /// Client-derived context for URL and header resolution.
90 context: HttpRequestContext,
91}
92
93impl HttpRequest {
94 /// Consumes a finished [`HttpRequestBuilder`] and freezes its fields into
95 /// an [`HttpRequest`].
96 ///
97 /// # Parameters
98 /// - `builder`: Populated builder produced by the HTTP client pipeline.
99 ///
100 /// # Returns
101 /// Snapshot ready for URL resolution, header assembly, and sending.
102 pub(super) fn new(builder: HttpRequestBuilder) -> Self {
103 let mut request = Self {
104 method: builder.method,
105 path: builder.path,
106 query: builder.query,
107 headers: builder.headers,
108 body: builder.body,
109 streaming_body: builder.streaming_body,
110 resolved_url: RwLock::new(None),
111 effective_headers: None,
112 execution_options: HttpRequestExecutionOptions {
113 request_timeout: builder.request_timeout,
114 write_timeout: builder.write_timeout,
115 read_timeout: builder.read_timeout,
116 cancellation_token: builder.cancellation_token,
117 retry_override: builder.retry_override,
118 },
119 context: HttpRequestContext {
120 base_url: builder.base_url,
121 ipv4_only: builder.ipv4_only,
122 default_headers: builder.default_headers,
123 injectors: builder.injectors,
124 async_injectors: builder.async_injectors,
125 },
126 };
127 request.refresh_resolved_url_cache();
128 request
129 }
130
131 /// Returns the HTTP verb for this snapshot.
132 ///
133 /// # Returns
134 /// Borrowed [`Method`] (for example GET or POST).
135 pub fn method(&self) -> &Method {
136 &self.method
137 }
138
139 /// Replaces the HTTP verb.
140 ///
141 /// # Parameters
142 /// - `method`: New [`Method`].
143 ///
144 /// # Returns
145 /// `self` for method chaining.
146 pub fn set_method(&mut self, method: Method) -> &mut Self {
147 self.method = method;
148 self
149 }
150
151 /// Returns the path segment or absolute URL string stored on this request.
152 ///
153 /// # Returns
154 /// The raw path/URL before query string assembly; may be relative if a base
155 /// URL is set.
156 pub fn path(&self) -> &str {
157 &self.path
158 }
159
160 /// Replaces the path or absolute URL string.
161 ///
162 /// # Parameters
163 /// - `path`: New path or URL string (query string is managed separately via
164 /// [`Self::add_query_param`]).
165 ///
166 /// # Returns
167 /// `self` for method chaining.
168 pub fn set_path(&mut self, path: &str) -> &mut Self {
169 self.path = path.to_string();
170 self.refresh_resolved_url_cache();
171 self
172 }
173
174 /// Returns ordered `(name, value)` query pairs that will be appended to the
175 /// resolved URL.
176 ///
177 /// # Returns
178 /// Slice view of accumulated query parameters.
179 pub fn query(&self) -> &[(String, String)] {
180 &self.query
181 }
182
183 /// Appends a single query pair preserving insertion order.
184 ///
185 /// # Parameters
186 /// - `key`: Parameter name.
187 /// - `value`: Parameter value.
188 ///
189 /// # Returns
190 /// `self` for method chaining.
191 pub fn add_query_param(&mut self, key: &str, value: &str) -> &mut Self {
192 self.query.push((key.to_string(), value.to_string()));
193 self
194 }
195
196 /// Removes every query pair from this snapshot.
197 ///
198 /// # Returns
199 /// `self` for method chaining.
200 pub fn clear_query_params(&mut self) -> &mut Self {
201 self.query.clear();
202 self
203 }
204
205 /// Returns request-local headers layered on top of client defaults and
206 /// injector output at send time.
207 ///
208 /// # Returns
209 /// Borrowed [`HeaderMap`] owned by this request only (not merged defaults).
210 pub fn headers(&self) -> &HeaderMap {
211 &self.headers
212 }
213
214 /// Parses and inserts one header from string name/value pairs.
215 ///
216 /// # Parameters
217 /// - `name`: Header field name.
218 /// - `value`: Header field value.
219 ///
220 /// # Returns
221 /// `Ok(self)` on success.
222 ///
223 /// # Errors
224 /// Returns [`HttpError`] when name or value cannot be converted into valid
225 /// HTTP tokens.
226 pub fn set_header(&mut self, name: &str, value: &str) -> Result<&mut Self, HttpError> {
227 let (header_name, header_value) = parse_header(name, value)?;
228 self.headers.insert(header_name, header_value);
229 self.invalidate_effective_headers_cache();
230 Ok(self)
231 }
232
233 /// Inserts one header using pre-validated [`HeaderName`] / [`HeaderValue`]
234 /// types.
235 ///
236 /// # Parameters
237 /// - `name`: Typed header name.
238 /// - `value`: Typed header value.
239 ///
240 /// # Returns
241 /// `self` for method chaining.
242 pub fn set_typed_header(&mut self, name: HeaderName, value: HeaderValue) -> &mut Self {
243 self.headers.insert(name, value);
244 self.invalidate_effective_headers_cache();
245 self
246 }
247
248 /// Removes all values for a header field by typed name.
249 ///
250 /// # Parameters
251 /// - `name`: Header name to strip from the request-local map.
252 ///
253 /// # Returns
254 /// `self` for method chaining.
255 pub fn remove_header(&mut self, name: &HeaderName) -> &mut Self {
256 self.headers.remove(name);
257 self.invalidate_effective_headers_cache();
258 self
259 }
260
261 /// Clears all request-local headers (defaults and injectors are unaffected
262 /// until send).
263 ///
264 /// # Returns
265 /// `self` for method chaining.
266 pub fn clear_headers(&mut self) -> &mut Self {
267 self.headers.clear();
268 self.invalidate_effective_headers_cache();
269 self
270 }
271
272 /// Returns the serialized body variant for this snapshot.
273 ///
274 /// # Returns
275 /// Borrowed [`HttpRequestBody`].
276 pub fn body(&self) -> &HttpRequestBody {
277 &self.body
278 }
279
280 /// Replaces the entire body payload.
281 ///
282 /// # Parameters
283 /// - `body`: New [`HttpRequestBody`] variant.
284 ///
285 /// # Returns
286 /// `self` for method chaining.
287 pub fn set_body(&mut self, body: HttpRequestBody) -> &mut Self {
288 self.body = body;
289 self.streaming_body = None;
290 self
291 }
292
293 /// Sets deferred streaming upload body factory for this request.
294 ///
295 /// # Parameters
296 /// - `streaming_body`: Deferred body stream factory reused across retries.
297 ///
298 /// # Returns
299 /// `self` for method chaining.
300 pub fn set_streaming_body(&mut self, streaming_body: HttpRequestStreamingBody) -> &mut Self {
301 self.streaming_body = Some(streaming_body);
302 self.body = HttpRequestBody::Empty;
303 self
304 }
305
306 /// Returns the per-request total timeout, if any.
307 ///
308 /// # Returns
309 /// `Some(duration)` when a request-specific timeout overrides the client
310 /// default; otherwise `None`.
311 pub fn request_timeout(&self) -> Option<Duration> {
312 self.execution_options.request_timeout
313 }
314
315 /// Sets a per-request total timeout that overrides the client default for
316 /// this send.
317 ///
318 /// # Parameters
319 /// - `timeout`: Upper bound for the entire request lifecycle handled by
320 /// reqwest.
321 ///
322 /// # Returns
323 /// `self` for method chaining.
324 pub fn set_request_timeout(&mut self, timeout: Duration) -> &mut Self {
325 self.execution_options.request_timeout = Some(timeout);
326 self
327 }
328
329 /// Drops the per-request timeout so the client-wide default applies again.
330 ///
331 /// # Returns
332 /// `self` for method chaining.
333 pub fn clear_request_timeout(&mut self) -> &mut Self {
334 self.execution_options.request_timeout = None;
335 self
336 }
337
338 /// Returns the write-phase timeout used while sending the request.
339 pub fn write_timeout(&self) -> Duration {
340 self.execution_options.write_timeout
341 }
342
343 /// Sets the write-phase timeout used while sending the request.
344 pub fn set_write_timeout(&mut self, timeout: Duration) -> &mut Self {
345 self.execution_options.write_timeout = timeout;
346 self
347 }
348
349 /// Returns the read-phase timeout used while reading response body bytes.
350 pub fn read_timeout(&self) -> Duration {
351 self.execution_options.read_timeout
352 }
353
354 /// Sets the read-phase timeout used while reading response body bytes.
355 pub fn set_read_timeout(&mut self, timeout: Duration) -> &mut Self {
356 self.execution_options.read_timeout = timeout;
357 self
358 }
359
360 /// Returns the optional base URL used to resolve relative [`Self::path`]
361 /// values.
362 ///
363 /// # Returns
364 /// `Some` when a base is configured; `None` when only absolute URLs in
365 /// `path` are valid.
366 pub fn base_url(&self) -> Option<&Url> {
367 self.context.base_url.as_ref()
368 }
369
370 /// Sets the base URL used for internal URL resolution when `path` is not
371 /// absolute.
372 ///
373 /// # Parameters
374 /// - `base_url`: Root URL to join against relative paths.
375 ///
376 /// # Returns
377 /// `self` for method chaining.
378 pub fn set_base_url(&mut self, base_url: Url) -> &mut Self {
379 self.context.base_url = Some(base_url);
380 self.refresh_resolved_url_cache();
381 self
382 }
383
384 /// Removes the configured base URL so relative paths can no longer be
385 /// resolved without resetting it.
386 ///
387 /// # Returns
388 /// `self` for method chaining.
389 pub fn clear_base_url(&mut self) -> &mut Self {
390 self.context.base_url = None;
391 self.refresh_resolved_url_cache();
392 self
393 }
394
395 /// Returns whether IPv6 literal hosts are rejected after URL resolution.
396 ///
397 /// # Returns
398 /// `true` when a resolved URL whose host is an IPv6 literal must be
399 /// rejected with [`HttpError::invalid_url`].
400 pub fn ipv4_only(&self) -> bool {
401 self.context.ipv4_only
402 }
403
404 /// Enables or disables IPv6 literal host rejection for resolved URLs.
405 ///
406 /// # Parameters
407 /// - `enabled`: When `true`, resolved URLs whose host is an IPv6 literal
408 /// are errors.
409 ///
410 /// # Returns
411 /// `self` for method chaining.
412 pub fn set_ipv4_only(&mut self, enabled: bool) -> &mut Self {
413 self.context.ipv4_only = enabled;
414 self.refresh_resolved_url_cache();
415 self
416 }
417
418 /// Returns the cooperative cancellation handle, if configured.
419 ///
420 /// # Returns
421 /// `Some` token checked before send and during I/O; `None` when
422 /// cancellation is not wired.
423 pub fn cancellation_token(&self) -> Option<&CancellationToken> {
424 self.execution_options.cancellation_token.as_ref()
425 }
426
427 /// Attaches a [`CancellationToken`] that can abort this request
428 /// cooperatively.
429 ///
430 /// # Parameters
431 /// - `token`: Shared cancellation source.
432 ///
433 /// # Returns
434 /// `self` for method chaining.
435 pub fn set_cancellation_token(&mut self, token: CancellationToken) -> &mut Self {
436 self.execution_options.cancellation_token = Some(token);
437 self
438 }
439
440 /// Removes any cancellation token from this snapshot.
441 ///
442 /// # Returns
443 /// `self` for method chaining.
444 pub fn clear_cancellation_token(&mut self) -> &mut Self {
445 self.execution_options.cancellation_token = None;
446 self
447 }
448
449 /// Returns the per-request retry override applied by the client pipeline.
450 ///
451 /// # Returns
452 /// Borrowed [`HttpRequestRetryOverride`].
453 pub fn retry_override(&self) -> &HttpRequestRetryOverride {
454 &self.execution_options.retry_override
455 }
456
457 /// Replaces the retry override for this single request.
458 ///
459 /// # Parameters
460 /// - `retry_override`: New override policy and knobs.
461 ///
462 /// # Returns
463 /// `self` for method chaining.
464 pub fn set_retry_override(&mut self, retry_override: HttpRequestRetryOverride) -> &mut Self {
465 self.execution_options.retry_override = retry_override;
466 self
467 }
468
469 /// Moves the current body out, leaving [`HttpRequestBody::Empty`] in its
470 /// place.
471 ///
472 /// Used internally before handing the payload to reqwest so the snapshot is
473 /// not cloned twice.
474 ///
475 /// # Returns
476 /// Previous [`HttpRequestBody`] value.
477 pub(crate) fn take_body(&mut self) -> HttpRequestBody {
478 std::mem::replace(&mut self.body, HttpRequestBody::Empty)
479 }
480
481 /// Assembles a reqwest [`RequestBuilder`](reqwest::RequestBuilder), applies
482 /// this snapshot's body, then sends with a bounded write phase.
483 ///
484 /// Centralizes send-attempt preparation and transport wiring:
485 /// - invalidates and recomputes effective headers for each attempt;
486 /// - emits request TRACE logs via the provided logger;
487 /// - applies query/timeout/body wiring plus cooperative cancellation and
488 /// write-timeout handling.
489 ///
490 /// # Parameters
491 /// - `backend`: Shared reqwest client.
492 /// - `logger`: Attempt-scoped request logger.
493 ///
494 /// # Returns
495 /// The successful [`Response`] or a mapped [`HttpError`].
496 ///
497 /// # Errors
498 /// - Cooperative cancellation while waiting on the send future.
499 /// - Transport failures mapped from reqwest.
500 /// - Write timeout when the send future does not complete within
501 /// `write_timeout`.
502 pub(crate) async fn send_impl(
503 &mut self,
504 backend: &reqwest::Client,
505 logger: &HttpLogger<'_>,
506 ) -> HttpResult<Response> {
507 // Effective headers are cached on the request. Each send attempt must
508 // invalidate and recompute them so injector output and request mutations
509 // are refreshed instead of reusing stale headers from prior attempts.
510 self.invalidate_effective_headers_cache();
511 let method = self.method().clone();
512 let request_url_context = self.resolved_url_with_query().ok();
513 let write_timeout = self.execution_options.write_timeout;
514 let cancellation_token = self.execution_options.cancellation_token.clone();
515 let headers = Self::await_pre_send_future(
516 self.effective_headers(),
517 write_timeout,
518 cancellation_token,
519 &method,
520 request_url_context.as_ref(),
521 "Request cancelled while preparing request",
522 format!(
523 "Write timeout after {:?} while preparing request",
524 write_timeout
525 ),
526 )
527 .await?
528 .clone();
529 let url = self.resolved_url()?;
530 let request_url = self.resolved_url_with_query()?;
531 // Log the request after computing effective headers so TRACE logs
532 // include the same query string used by the actual send path.
533 logger.log_request(self);
534 let mut builder = backend.request(method.clone(), url.clone());
535 builder = builder.headers(headers);
536 if !self.query.is_empty() {
537 builder = builder.query(self.query.as_slice());
538 }
539 if let Some(timeout) = self.execution_options.request_timeout {
540 builder = builder.timeout(timeout);
541 }
542 if let Some(streaming_body) = self.streaming_body.as_ref() {
543 let body = Self::await_pre_send_future(
544 async { Ok(streaming_body.to_reqwest_body().await) },
545 self.execution_options.write_timeout,
546 self.execution_options.cancellation_token.clone(),
547 &method,
548 Some(&request_url),
549 "Request cancelled while preparing streaming request body",
550 format!(
551 "Write timeout after {:?} while preparing streaming request body",
552 self.execution_options.write_timeout
553 ),
554 )
555 .await?;
556 builder = builder.body(body);
557 } else {
558 builder = Self::apply_request_body(builder, self.take_body());
559 }
560
561 let send_future =
562 tokio::time::timeout(self.execution_options.write_timeout, builder.send());
563 let next = if let Some(token) = self.execution_options.cancellation_token.as_ref() {
564 tokio::select! {
565 _ = token.cancelled() => {
566 return Err(HttpError::cancelled("Request cancelled while sending")
567 .with_method(&method)
568 .with_url(&request_url));
569 }
570 send_result = send_future => send_result,
571 }
572 } else {
573 send_future.await
574 };
575
576 match next {
577 Ok(Ok(response)) => Ok(response),
578 Ok(Err(error)) => Err(map_reqwest_error(
579 error,
580 HttpErrorKind::Transport,
581 Some(ReqwestErrorPhase::Send),
582 Some(method.clone()),
583 Some(request_url.clone()),
584 )),
585 Err(_) => Err(HttpError::write_timeout(format!(
586 "Write timeout after {:?} while sending request",
587 self.execution_options.write_timeout
588 ))
589 .with_method(&method)
590 .with_url(&request_url)),
591 }
592 }
593
594 /// Waits for one asynchronous pre-send preparation step with cancellation and
595 /// write-timeout handling.
596 ///
597 /// # Parameters
598 /// - `future`: Preparation future, such as async header injection or streaming
599 /// body factory execution.
600 /// - `write_timeout`: Timeout budget reused for send preparation.
601 /// - `cancellation_token`: Optional request cancellation token.
602 /// - `method`: Request method for error context.
603 /// - `request_url`: Optional resolved request URL for error context.
604 /// - `cancellation_message`: Message used when cancellation wins.
605 /// - `timeout_message`: Message used when timeout wins.
606 ///
607 /// # Returns
608 /// The future output when it completes before cancellation or timeout.
609 ///
610 /// # Errors
611 /// Returns [`HttpErrorKind::Cancelled`] on cancellation,
612 /// [`HttpErrorKind::WriteTimeout`] on timeout, or propagates the future's own
613 /// error.
614 async fn await_pre_send_future<T, F>(
615 future: F,
616 write_timeout: Duration,
617 cancellation_token: Option<CancellationToken>,
618 method: &Method,
619 request_url: Option<&Url>,
620 cancellation_message: &str,
621 timeout_message: String,
622 ) -> HttpResult<T>
623 where
624 F: Future<Output = HttpResult<T>>,
625 {
626 let timed = tokio::time::timeout(write_timeout, future);
627 let next = if let Some(token) = cancellation_token.as_ref() {
628 tokio::select! {
629 _ = token.cancelled() => {
630 return Err(Self::pre_send_cancelled_error(
631 cancellation_message,
632 method,
633 request_url,
634 ));
635 }
636 result = timed => result,
637 }
638 } else {
639 timed.await
640 };
641
642 match next {
643 Ok(result) => result,
644 Err(_) => Err(Self::pre_send_write_timeout_error(
645 timeout_message,
646 method,
647 request_url,
648 )),
649 }
650 }
651
652 /// Builds a cancellation error for pre-send preparation.
653 ///
654 /// # Parameters
655 /// - `message`: Cancellation message.
656 /// - `method`: Request method for context.
657 /// - `request_url`: Optional request URL for context.
658 ///
659 /// # Returns
660 /// Cancellation [`HttpError`] with request context attached.
661 fn pre_send_cancelled_error(
662 message: &str,
663 method: &Method,
664 request_url: Option<&Url>,
665 ) -> HttpError {
666 let mut error = HttpError::cancelled(message).with_method(method);
667 if let Some(request_url) = request_url {
668 error = error.with_url(request_url);
669 }
670 error
671 }
672
673 /// Builds a write-timeout error for pre-send preparation.
674 ///
675 /// # Parameters
676 /// - `message`: Timeout message.
677 /// - `method`: Request method for context.
678 /// - `request_url`: Optional request URL for context.
679 ///
680 /// # Returns
681 /// Write-timeout [`HttpError`] with request context attached.
682 fn pre_send_write_timeout_error(
683 message: String,
684 method: &Method,
685 request_url: Option<&Url>,
686 ) -> HttpError {
687 let mut error = HttpError::write_timeout(message).with_method(method);
688 if let Some(request_url) = request_url {
689 error = error.with_url(request_url);
690 }
691 error
692 }
693
694 /// Returns the resolved URL for current request fields, computing and
695 /// caching it on demand.
696 ///
697 /// # Returns
698 /// Resolved [`Url`] value (cloned from cache when already computed).
699 ///
700 /// # Errors
701 /// Returns [`HttpError::invalid_url`] when parsing fails, the base URL is
702 /// missing for a relative path, joining fails, or [`Self::ipv4_only`]
703 /// rejects an IPv6 literal host.
704 pub(crate) fn resolved_url(&self) -> Result<Url, HttpError> {
705 let cached = match self.resolved_url.read() {
706 Ok(guard) => guard.clone(),
707 Err(_) => return Err(HttpError::other("Resolved URL cache read lock poisoned")),
708 };
709 if let Some(url) = cached.as_ref() {
710 return Ok(url.clone());
711 }
712 let resolved = self.compute_resolved_url()?;
713 match self.resolved_url.write() {
714 Ok(mut guard) => *guard = Some(resolved.clone()),
715 Err(_) => return Err(HttpError::other("Resolved URL cache write lock poisoned")),
716 }
717 Ok(resolved)
718 }
719
720 /// Returns the resolved URL plus request-builder query parameters.
721 ///
722 /// This mirrors the URL sent by [`Self::send_impl`]: query pairs already
723 /// present in [`Self::path`] are preserved, and pairs from
724 /// [`Self::query`] are appended in insertion order.
725 ///
726 /// # Returns
727 /// Resolved [`Url`] including query parameters from this request snapshot.
728 ///
729 /// # Errors
730 /// Propagates [`Self::resolved_url`] errors for invalid or unresolved URLs.
731 pub(crate) fn resolved_url_with_query(&self) -> Result<Url, HttpError> {
732 let mut url = self.resolved_url()?;
733 if !self.query.is_empty() {
734 {
735 let mut pairs = url.query_pairs_mut();
736 for (key, value) in &self.query {
737 pairs.append_pair(key, value);
738 }
739 }
740 }
741 Ok(url)
742 }
743
744 /// Returns cached resolved URL when available.
745 pub fn resolved_url_cached(&self) -> Option<Url> {
746 self.resolved_url
747 .read()
748 .map(|guard| guard.clone())
749 .unwrap_or_default()
750 }
751
752 /// Recomputes and stores the current resolved URL.
753 fn refresh_resolved_url_cache(&mut self) {
754 if let Ok(mut guard) = self.resolved_url.write() {
755 *guard = self.compute_resolved_url().ok();
756 }
757 }
758
759 /// Computes the resolved URL from current path/base/ipv4 settings.
760 fn compute_resolved_url(&self) -> Result<Url, HttpError> {
761 if let Ok(url) = Url::parse(&self.path) {
762 self.validate_resolved_url_host(&url)?;
763 return Ok(url);
764 }
765
766 let base = self.context.base_url.as_ref().ok_or_else(|| {
767 HttpError::invalid_url(format!(
768 "Cannot resolve relative path '{}' without base_url",
769 self.path
770 ))
771 })?;
772
773 let url = base.join(&self.path).map_err(|error| {
774 HttpError::invalid_url(format!(
775 "Failed to resolve path '{}' against base URL '{}': {}",
776 self.path, base, error
777 ))
778 })?;
779 self.validate_resolved_url_host(&url)?;
780 Ok(url)
781 }
782
783 /// Enforces [`Self::ipv4_only`] by rejecting IPv6 literal hosts in `url`.
784 ///
785 /// # Parameters
786 /// - `url`: Candidate URL after parsing or joining.
787 ///
788 /// # Returns
789 /// `Ok(())` when the host is acceptable.
790 ///
791 /// # Errors
792 /// [`HttpError::invalid_url`] when `ipv4_only` is `true` and the host is an
793 /// IPv6 literal.
794 fn validate_resolved_url_host(&self, url: &Url) -> Result<(), HttpError> {
795 if self.context.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
796 return Err(HttpError::invalid_url(format!(
797 "IPv6 literal host is not allowed when ipv4_only=true: {}",
798 url
799 )));
800 }
801 Ok(())
802 }
803
804 /// Returns the attempt-scoped merged outbound headers.
805 ///
806 /// On first call after invalidation, this computes merged headers by
807 /// replaying defaults/injectors/request-local headers and stores them in
808 /// [`Self::effective_headers`]. Later calls in the same attempt return the
809 /// cached map.
810 ///
811 /// Why this API is async:
812 /// - async injectors are part of header assembly and may perform awaitable
813 /// work (for example token refresh or other I/O-backed value resolution).
814 /// - therefore header materialization cannot be fully synchronous.
815 ///
816 /// Merge order (later wins on duplicates):
817 /// 1. Client default headers snapshot captured when the builder was
818 /// created.
819 /// 2. Synchronous injector output in registration order.
820 /// 3. Asynchronous injector output in registration order.
821 /// 4. Request-local headers from this snapshot.
822 ///
823 /// # Returns
824 /// Borrowed merged [`HeaderMap`] from the cache.
825 ///
826 /// # Errors
827 /// Propagates failures returned by any injector's `apply` implementation.
828 pub(crate) async fn effective_headers(&mut self) -> HttpResult<&HeaderMap> {
829 if self.effective_headers.is_none() {
830 self.effective_headers = Some(self.compute_effective_headers().await?);
831 }
832 Ok(self
833 .effective_headers
834 .as_ref()
835 .expect("effective headers cache must be populated after computation"))
836 }
837
838 /// Returns cached merged outbound headers when available.
839 pub(crate) fn effective_headers_cached(&self) -> Option<&HeaderMap> {
840 self.effective_headers.as_ref()
841 }
842
843 /// Clears the effective-header cache.
844 ///
845 /// This method invalidates [`Self::effective_headers`] so the next call to
846 /// [`Self::effective_headers`] recomputes merged headers by re-running
847 /// defaults and header injectors.
848 ///
849 /// Why this is needed:
850 /// - request-local headers may have been mutated (`set_header`, `clear_headers`, etc.);
851 /// - injector output may be time-sensitive (for example rotating auth token
852 /// or timestamp-based signatures), so each send attempt should recompute
853 /// merged headers instead of reusing stale values from prior attempts.
854 ///
855 /// When to call:
856 /// - immediately before starting a new send attempt;
857 /// - after any mutation that can change final outbound headers.
858 pub(crate) fn invalidate_effective_headers_cache(&mut self) {
859 self.effective_headers = None;
860 }
861
862 /// Computes merged outbound headers without touching the cache.
863 async fn compute_effective_headers(&self) -> HttpResult<HeaderMap> {
864 let mut headers = self.context.default_headers.clone();
865
866 for injector in &self.context.injectors {
867 injector.apply(&mut headers)?;
868 }
869 for injector in &self.context.async_injectors {
870 injector.apply(&mut headers).await?;
871 }
872
873 headers.extend(self.headers.clone());
874 Ok(headers)
875 }
876
877 /// Returns a pre-cancelled [`HttpError`] when a token is present and
878 /// already cancelled.
879 ///
880 /// # Parameters
881 /// - `message`: Human-readable cancellation reason.
882 ///
883 /// # Returns
884 /// `Some` [`HttpError`] (including method context and cached URL when
885 /// available) when a token exists and is already cancelled; otherwise
886 /// `None`.
887 pub(crate) fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
888 if self
889 .execution_options
890 .cancellation_token
891 .as_ref()
892 .is_some_and(CancellationToken::is_cancelled)
893 {
894 let mut error = HttpError::cancelled(message.to_string()).with_method(&self.method);
895 if let Ok(url) = self.resolved_url_with_query() {
896 error = error.with_url(&url);
897 }
898 Some(error)
899 } else {
900 None
901 }
902 }
903
904 /// Attaches the correct reqwest body encoding for each [`HttpRequestBody`]
905 /// variant.
906 ///
907 /// # Parameters
908 /// - `builder`: Partially configured [`reqwest::RequestBuilder`]
909 /// (method/URL/headers already set).
910 /// - `body`: Payload variant to attach; moved into the builder.
911 ///
912 /// # Returns
913 /// The same builder with an appropriate `.body(...)` applied (or unchanged
914 /// for [`HttpRequestBody::Empty`]).
915 fn apply_request_body(
916 builder: reqwest::RequestBuilder,
917 body: HttpRequestBody,
918 ) -> reqwest::RequestBuilder {
919 match body {
920 HttpRequestBody::Empty => builder,
921 HttpRequestBody::Bytes(bytes)
922 | HttpRequestBody::Json(bytes)
923 | HttpRequestBody::Form(bytes)
924 | HttpRequestBody::Multipart(bytes)
925 | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
926 HttpRequestBody::Stream(chunks) => {
927 let body_stream = futures_stream::iter(
928 chunks.into_iter().map(Result::<Bytes, std::io::Error>::Ok),
929 );
930 builder.body(reqwest::Body::wrap_stream(body_stream))
931 }
932 HttpRequestBody::Text(text) => builder.body(text),
933 }
934 }
935}
936
937/// Exercises request cache refresh branches for coverage-only tests.
938///
939/// # Returns
940/// Resolved URL and effective header count diagnostics.
941#[cfg(coverage)]
942#[doc(hidden)]
943pub(crate) async fn coverage_exercise_request_cache_paths() -> Vec<String> {
944 let client = crate::HttpClientFactory::new()
945 .create_default()
946 .expect("coverage HTTP client should build");
947 let mut request = client
948 .request(Method::GET, "https://example.com/coverage-cache")
949 .build();
950 *request
951 .resolved_url
952 .write()
953 .expect("resolved URL cache lock should not be poisoned") = None;
954 let url = request
955 .resolved_url()
956 .expect("coverage request URL should resolve");
957 let header_count = request
958 .effective_headers()
959 .await
960 .expect("coverage headers should compute")
961 .len();
962 let cached_header_count = request
963 .effective_headers()
964 .await
965 .expect("coverage headers should be cached")
966 .len();
967 vec![
968 url.to_string(),
969 header_count.to_string(),
970 cached_header_count.to_string(),
971 ]
972}
973
974impl Clone for HttpRequest {
975 fn clone(&self) -> Self {
976 Self {
977 method: self.method.clone(),
978 path: self.path.clone(),
979 query: self.query.clone(),
980 headers: self.headers.clone(),
981 body: self.body.clone(),
982 streaming_body: self.streaming_body.clone(),
983 resolved_url: RwLock::new(self.resolved_url_cached()),
984 effective_headers: self.effective_headers.clone(),
985 execution_options: self.execution_options.clone(),
986 context: self.context.clone(),
987 }
988 }
989}