qubit_http/request/http_request.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Immutable HTTP request object.
11
12use std::future::Future;
13use std::sync::RwLock;
14use std::time::Duration;
15
16use bytes::Bytes;
17use futures_util::stream as futures_stream;
18use http::{HeaderMap, HeaderName, HeaderValue, Method};
19use qubit_function::MutatingFunction;
20use reqwest::Response;
21use tokio_util::sync::CancellationToken;
22use url::Host;
23use url::Url;
24
25use crate::error::{backend_error_mapper::map_reqwest_error, ReqwestErrorPhase};
26use crate::{
27 AsyncHttpHeaderInjector, HttpError, HttpErrorKind, HttpHeaderInjector, HttpLogger,
28 HttpRequestStreamingBody, HttpResult,
29};
30
31use super::http_request_body::HttpRequestBody;
32use super::http_request_builder::HttpRequestBuilder;
33use super::http_request_retry_override::HttpRequestRetryOverride;
34use super::parse_header;
35
36/// Request execution options (timeouts, cancellation, and retry override).
37#[derive(Debug, Clone)]
38struct HttpRequestExecutionOptions {
39 /// Overrides client-wide request timeout when set; otherwise client default applies.
40 request_timeout: Option<Duration>,
41 /// Per-request write timeout used during request sending.
42 write_timeout: Duration,
43 /// Per-request read timeout used during response body reads.
44 read_timeout: Duration,
45 /// Optional cancellation token checked before send and during I/O phases.
46 cancellation_token: Option<CancellationToken>,
47 /// Per-request retry override (enable/disable/method-policy/Retry-After behavior).
48 retry_override: HttpRequestRetryOverride,
49}
50
51/// Request context captured from the originating client.
52#[derive(Debug, Clone)]
53struct HttpRequestContext {
54 /// Base URL copied from client options, used to resolve relative `path`.
55 base_url: Option<Url>,
56 /// Whether resolved URLs must avoid IPv6 literal hosts.
57 ipv4_only: bool,
58 /// Client default headers snapshot captured when this request builder was created.
59 default_headers: HeaderMap,
60 /// Client sync header injectors snapshot captured when this request builder was created.
61 injectors: Vec<HttpHeaderInjector>,
62 /// Client async header injectors snapshot captured when this request builder was created.
63 async_injectors: Vec<AsyncHttpHeaderInjector>,
64}
65
66/// Immutable snapshot of a single HTTP call produced by
67/// [`crate::HttpRequestBuilder`].
68#[derive(Debug)]
69pub struct HttpRequest {
70 /// HTTP method (GET, POST, …).
71 method: Method,
72 /// Absolute URL string, or path joined with client `base_url` when not
73 /// parseable as URL.
74 path: String,
75 /// Query string parameters as `(name, value)` pairs.
76 query: Vec<(String, String)>,
77 /// Headers added on top of client defaults and injector output.
78 headers: HeaderMap,
79 /// Serialized body variant.
80 body: HttpRequestBody,
81 /// Deferred per-attempt streaming body factory.
82 streaming_body: Option<HttpRequestStreamingBody>,
83 /// Lazily maintained cache for the currently resolved URL.
84 resolved_url: RwLock<Option<Url>>,
85 /// Attempt-scoped cache of merged outbound headers after applying
86 /// defaults/injectors/request-local headers.
87 effective_headers: Option<HeaderMap>,
88 /// Request execution options and runtime controls.
89 execution_options: HttpRequestExecutionOptions,
90 /// Client-derived context for URL and header resolution.
91 context: HttpRequestContext,
92}
93
94impl HttpRequest {
95 /// Consumes a finished [`HttpRequestBuilder`] and freezes its fields into
96 /// an [`HttpRequest`].
97 ///
98 /// # Parameters
99 /// - `builder`: Populated builder produced by the HTTP client pipeline.
100 ///
101 /// # Returns
102 /// Snapshot ready for URL resolution, header assembly, and sending.
103 pub(super) fn new(builder: HttpRequestBuilder) -> Self {
104 let mut request = Self {
105 method: builder.method,
106 path: builder.path,
107 query: builder.query,
108 headers: builder.headers,
109 body: builder.body,
110 streaming_body: builder.streaming_body,
111 resolved_url: RwLock::new(None),
112 effective_headers: None,
113 execution_options: HttpRequestExecutionOptions {
114 request_timeout: builder.request_timeout,
115 write_timeout: builder.write_timeout,
116 read_timeout: builder.read_timeout,
117 cancellation_token: builder.cancellation_token,
118 retry_override: builder.retry_override,
119 },
120 context: HttpRequestContext {
121 base_url: builder.base_url,
122 ipv4_only: builder.ipv4_only,
123 default_headers: builder.default_headers,
124 injectors: builder.injectors,
125 async_injectors: builder.async_injectors,
126 },
127 };
128 request.refresh_resolved_url_cache();
129 request
130 }
131
132 /// Returns the HTTP verb for this snapshot.
133 ///
134 /// # Returns
135 /// Borrowed [`Method`] (for example GET or POST).
136 pub fn method(&self) -> &Method {
137 &self.method
138 }
139
140 /// Replaces the HTTP verb.
141 ///
142 /// # Parameters
143 /// - `method`: New [`Method`].
144 ///
145 /// # Returns
146 /// `self` for method chaining.
147 pub fn set_method(&mut self, method: Method) -> &mut Self {
148 self.method = method;
149 self
150 }
151
152 /// Returns the path segment or absolute URL string stored on this request.
153 ///
154 /// # Returns
155 /// The raw path/URL before query string assembly; may be relative if a base
156 /// URL is set.
157 pub fn path(&self) -> &str {
158 &self.path
159 }
160
161 /// Replaces the path or absolute URL string.
162 ///
163 /// # Parameters
164 /// - `path`: New path or URL string (query string is managed separately via
165 /// [`Self::add_query_param`]).
166 ///
167 /// # Returns
168 /// `self` for method chaining.
169 pub fn set_path(&mut self, path: &str) -> &mut Self {
170 self.path = path.to_string();
171 self.refresh_resolved_url_cache();
172 self
173 }
174
175 /// Returns ordered `(name, value)` query pairs that will be appended to the
176 /// resolved URL.
177 ///
178 /// # Returns
179 /// Slice view of accumulated query parameters.
180 pub fn query(&self) -> &[(String, String)] {
181 &self.query
182 }
183
184 /// Appends a single query pair preserving insertion order.
185 ///
186 /// # Parameters
187 /// - `key`: Parameter name.
188 /// - `value`: Parameter value.
189 ///
190 /// # Returns
191 /// `self` for method chaining.
192 pub fn add_query_param(&mut self, key: &str, value: &str) -> &mut Self {
193 self.query.push((key.to_string(), value.to_string()));
194 self
195 }
196
197 /// Removes every query pair from this snapshot.
198 ///
199 /// # Returns
200 /// `self` for method chaining.
201 pub fn clear_query_params(&mut self) -> &mut Self {
202 self.query.clear();
203 self
204 }
205
206 /// Returns request-local headers layered on top of client defaults and
207 /// injector output at send time.
208 ///
209 /// # Returns
210 /// Borrowed [`HeaderMap`] owned by this request only (not merged defaults).
211 pub fn headers(&self) -> &HeaderMap {
212 &self.headers
213 }
214
215 /// Parses and inserts one header from string name/value pairs.
216 ///
217 /// # Parameters
218 /// - `name`: Header field name.
219 /// - `value`: Header field value.
220 ///
221 /// # Returns
222 /// `Ok(self)` on success.
223 ///
224 /// # Errors
225 /// Returns [`HttpError`] when name or value cannot be converted into valid
226 /// HTTP tokens.
227 pub fn set_header(&mut self, name: &str, value: &str) -> Result<&mut Self, HttpError> {
228 let (header_name, header_value) = parse_header(name, value)?;
229 self.headers.insert(header_name, header_value);
230 self.invalidate_effective_headers_cache();
231 Ok(self)
232 }
233
234 /// Inserts one header using pre-validated [`HeaderName`] / [`HeaderValue`]
235 /// types.
236 ///
237 /// # Parameters
238 /// - `name`: Typed header name.
239 /// - `value`: Typed header value.
240 ///
241 /// # Returns
242 /// `self` for method chaining.
243 pub fn set_typed_header(&mut self, name: HeaderName, value: HeaderValue) -> &mut Self {
244 self.headers.insert(name, value);
245 self.invalidate_effective_headers_cache();
246 self
247 }
248
249 /// Removes all values for a header field by typed name.
250 ///
251 /// # Parameters
252 /// - `name`: Header name to strip from the request-local map.
253 ///
254 /// # Returns
255 /// `self` for method chaining.
256 pub fn remove_header(&mut self, name: &HeaderName) -> &mut Self {
257 self.headers.remove(name);
258 self.invalidate_effective_headers_cache();
259 self
260 }
261
262 /// Clears all request-local headers (defaults and injectors are unaffected
263 /// until send).
264 ///
265 /// # Returns
266 /// `self` for method chaining.
267 pub fn clear_headers(&mut self) -> &mut Self {
268 self.headers.clear();
269 self.invalidate_effective_headers_cache();
270 self
271 }
272
273 /// Returns the serialized body variant for this snapshot.
274 ///
275 /// # Returns
276 /// Borrowed [`HttpRequestBody`].
277 pub fn body(&self) -> &HttpRequestBody {
278 &self.body
279 }
280
281 /// Replaces the entire body payload.
282 ///
283 /// # Parameters
284 /// - `body`: New [`HttpRequestBody`] variant.
285 ///
286 /// # Returns
287 /// `self` for method chaining.
288 pub fn set_body(&mut self, body: HttpRequestBody) -> &mut Self {
289 self.body = body;
290 self.streaming_body = None;
291 self
292 }
293
294 /// Sets deferred streaming upload body factory for this request.
295 ///
296 /// # Parameters
297 /// - `streaming_body`: Deferred body stream factory reused across retries.
298 ///
299 /// # Returns
300 /// `self` for method chaining.
301 pub fn set_streaming_body(&mut self, streaming_body: HttpRequestStreamingBody) -> &mut Self {
302 self.streaming_body = Some(streaming_body);
303 self.body = HttpRequestBody::Empty;
304 self
305 }
306
307 /// Returns the per-request total timeout, if any.
308 ///
309 /// # Returns
310 /// `Some(duration)` when a request-specific timeout overrides the client
311 /// default; otherwise `None`.
312 pub fn request_timeout(&self) -> Option<Duration> {
313 self.execution_options.request_timeout
314 }
315
316 /// Sets a per-request total timeout that overrides the client default for
317 /// this send.
318 ///
319 /// # Parameters
320 /// - `timeout`: Upper bound for the entire request lifecycle handled by
321 /// reqwest.
322 ///
323 /// # Returns
324 /// `self` for method chaining.
325 pub fn set_request_timeout(&mut self, timeout: Duration) -> &mut Self {
326 self.execution_options.request_timeout = Some(timeout);
327 self
328 }
329
330 /// Drops the per-request timeout so the client-wide default applies again.
331 ///
332 /// # Returns
333 /// `self` for method chaining.
334 pub fn clear_request_timeout(&mut self) -> &mut Self {
335 self.execution_options.request_timeout = None;
336 self
337 }
338
339 /// Returns the write-phase timeout used while sending the request.
340 pub fn write_timeout(&self) -> Duration {
341 self.execution_options.write_timeout
342 }
343
344 /// Sets the write-phase timeout used while sending the request.
345 pub fn set_write_timeout(&mut self, timeout: Duration) -> &mut Self {
346 self.execution_options.write_timeout = timeout;
347 self
348 }
349
350 /// Returns the read-phase timeout used while reading response body bytes.
351 pub fn read_timeout(&self) -> Duration {
352 self.execution_options.read_timeout
353 }
354
355 /// Sets the read-phase timeout used while reading response body bytes.
356 pub fn set_read_timeout(&mut self, timeout: Duration) -> &mut Self {
357 self.execution_options.read_timeout = timeout;
358 self
359 }
360
361 /// Returns the optional base URL used to resolve relative [`Self::path`]
362 /// values.
363 ///
364 /// # Returns
365 /// `Some` when a base is configured; `None` when only absolute URLs in
366 /// `path` are valid.
367 pub fn base_url(&self) -> Option<&Url> {
368 self.context.base_url.as_ref()
369 }
370
371 /// Sets the base URL used for internal URL resolution when `path` is not
372 /// absolute.
373 ///
374 /// # Parameters
375 /// - `base_url`: Root URL to join against relative paths.
376 ///
377 /// # Returns
378 /// `self` for method chaining.
379 pub fn set_base_url(&mut self, base_url: Url) -> &mut Self {
380 self.context.base_url = Some(base_url);
381 self.refresh_resolved_url_cache();
382 self
383 }
384
385 /// Removes the configured base URL so relative paths can no longer be
386 /// resolved without resetting it.
387 ///
388 /// # Returns
389 /// `self` for method chaining.
390 pub fn clear_base_url(&mut self) -> &mut Self {
391 self.context.base_url = None;
392 self.refresh_resolved_url_cache();
393 self
394 }
395
396 /// Returns whether IPv6 literal hosts are rejected after URL resolution.
397 ///
398 /// # Returns
399 /// `true` when a resolved URL whose host is an IPv6 literal must be
400 /// rejected with [`HttpError::invalid_url`].
401 pub fn ipv4_only(&self) -> bool {
402 self.context.ipv4_only
403 }
404
405 /// Enables or disables IPv6 literal host rejection for resolved URLs.
406 ///
407 /// # Parameters
408 /// - `enabled`: When `true`, resolved URLs whose host is an IPv6 literal
409 /// are errors.
410 ///
411 /// # Returns
412 /// `self` for method chaining.
413 pub fn set_ipv4_only(&mut self, enabled: bool) -> &mut Self {
414 self.context.ipv4_only = enabled;
415 self.refresh_resolved_url_cache();
416 self
417 }
418
419 /// Returns the cooperative cancellation handle, if configured.
420 ///
421 /// # Returns
422 /// `Some` token checked before send and during I/O; `None` when
423 /// cancellation is not wired.
424 pub fn cancellation_token(&self) -> Option<&CancellationToken> {
425 self.execution_options.cancellation_token.as_ref()
426 }
427
428 /// Attaches a [`CancellationToken`] that can abort this request
429 /// cooperatively.
430 ///
431 /// # Parameters
432 /// - `token`: Shared cancellation source.
433 ///
434 /// # Returns
435 /// `self` for method chaining.
436 pub fn set_cancellation_token(&mut self, token: CancellationToken) -> &mut Self {
437 self.execution_options.cancellation_token = Some(token);
438 self
439 }
440
441 /// Removes any cancellation token from this snapshot.
442 ///
443 /// # Returns
444 /// `self` for method chaining.
445 pub fn clear_cancellation_token(&mut self) -> &mut Self {
446 self.execution_options.cancellation_token = None;
447 self
448 }
449
450 /// Returns the per-request retry override applied by the client pipeline.
451 ///
452 /// # Returns
453 /// Borrowed [`HttpRequestRetryOverride`].
454 pub fn retry_override(&self) -> &HttpRequestRetryOverride {
455 &self.execution_options.retry_override
456 }
457
458 /// Replaces the retry override for this single request.
459 ///
460 /// # Parameters
461 /// - `retry_override`: New override policy and knobs.
462 ///
463 /// # Returns
464 /// `self` for method chaining.
465 pub fn set_retry_override(&mut self, retry_override: HttpRequestRetryOverride) -> &mut Self {
466 self.execution_options.retry_override = retry_override;
467 self
468 }
469
470 /// Moves the current body out, leaving [`HttpRequestBody::Empty`] in its
471 /// place.
472 ///
473 /// Used internally before handing the payload to reqwest so the snapshot is
474 /// not cloned twice.
475 ///
476 /// # Returns
477 /// Previous [`HttpRequestBody`] value.
478 pub(crate) fn take_body(&mut self) -> HttpRequestBody {
479 std::mem::replace(&mut self.body, HttpRequestBody::Empty)
480 }
481
482 /// Assembles a reqwest [`RequestBuilder`](reqwest::RequestBuilder), applies
483 /// this snapshot's body, then sends with a bounded write phase.
484 ///
485 /// Centralizes send-attempt preparation and transport wiring:
486 /// - invalidates and recomputes effective headers for each attempt;
487 /// - emits request TRACE logs via the provided logger;
488 /// - applies query/timeout/body wiring plus cooperative cancellation and
489 /// write-timeout handling.
490 ///
491 /// # Parameters
492 /// - `backend`: Shared reqwest client.
493 /// - `logger`: Attempt-scoped request logger.
494 ///
495 /// # Returns
496 /// The successful [`Response`] or a mapped [`HttpError`].
497 ///
498 /// # Errors
499 /// - Cooperative cancellation while waiting on the send future.
500 /// - Transport failures mapped from reqwest.
501 /// - Write timeout when the send future does not complete within
502 /// `write_timeout`.
503 pub(crate) async fn send_impl(
504 &mut self,
505 backend: &reqwest::Client,
506 logger: &HttpLogger<'_>,
507 ) -> HttpResult<Response> {
508 // Effective headers are cached on the request. Each send attempt must
509 // invalidate and recompute them so injector output and request mutations
510 // are refreshed instead of reusing stale headers from prior attempts.
511 self.invalidate_effective_headers_cache();
512 let method = self.method().clone();
513 let request_url_context = self.resolved_url_with_query().ok();
514 let write_timeout = self.execution_options.write_timeout;
515 let cancellation_token = self.execution_options.cancellation_token.clone();
516 let headers = Self::await_pre_send_future(
517 self.effective_headers(),
518 write_timeout,
519 cancellation_token,
520 &method,
521 request_url_context.as_ref(),
522 "Request cancelled while preparing request",
523 format!(
524 "Write timeout after {:?} while preparing request",
525 write_timeout
526 ),
527 )
528 .await?
529 .clone();
530 let url = self.resolved_url()?;
531 let request_url = self.resolved_url_with_query()?;
532 // Log the request after computing effective headers so TRACE logs
533 // include the same query string used by the actual send path.
534 logger.log_request(self);
535 let mut builder = backend.request(method.clone(), url.clone());
536 builder = builder.headers(headers);
537 if !self.query.is_empty() {
538 builder = builder.query(self.query.as_slice());
539 }
540 if let Some(timeout) = self.execution_options.request_timeout {
541 builder = builder.timeout(timeout);
542 }
543 if let Some(streaming_body) = self.streaming_body.as_ref() {
544 let body = Self::await_pre_send_future(
545 async { Ok(streaming_body.to_reqwest_body().await) },
546 self.execution_options.write_timeout,
547 self.execution_options.cancellation_token.clone(),
548 &method,
549 Some(&request_url),
550 "Request cancelled while preparing streaming request body",
551 format!(
552 "Write timeout after {:?} while preparing streaming request body",
553 self.execution_options.write_timeout
554 ),
555 )
556 .await?;
557 builder = builder.body(body);
558 } else {
559 builder = Self::apply_request_body(builder, self.take_body());
560 }
561
562 let send_future =
563 tokio::time::timeout(self.execution_options.write_timeout, builder.send());
564 let next = if let Some(token) = self.execution_options.cancellation_token.as_ref() {
565 tokio::select! {
566 _ = token.cancelled() => {
567 return Err(HttpError::cancelled("Request cancelled while sending")
568 .with_method(&method)
569 .with_url(&request_url));
570 }
571 send_result = send_future => send_result,
572 }
573 } else {
574 send_future.await
575 };
576
577 match next {
578 Ok(Ok(response)) => Ok(response),
579 Ok(Err(error)) => Err(map_reqwest_error(
580 error,
581 HttpErrorKind::Transport,
582 ReqwestErrorPhase::Send,
583 Some(method.clone()),
584 Some(request_url.clone()),
585 )),
586 Err(_) => Err(HttpError::write_timeout(format!(
587 "Write timeout after {:?} while sending request",
588 self.execution_options.write_timeout
589 ))
590 .with_method(&method)
591 .with_url(&request_url)),
592 }
593 }
594
595 /// Waits for one asynchronous pre-send preparation step with cancellation and
596 /// write-timeout handling.
597 ///
598 /// # Parameters
599 /// - `future`: Preparation future, such as async header injection or streaming
600 /// body factory execution.
601 /// - `write_timeout`: Timeout budget reused for send preparation.
602 /// - `cancellation_token`: Optional request cancellation token.
603 /// - `method`: Request method for error context.
604 /// - `request_url`: Optional resolved request URL for error context.
605 /// - `cancellation_message`: Message used when cancellation wins.
606 /// - `timeout_message`: Message used when timeout wins.
607 ///
608 /// # Returns
609 /// The future output when it completes before cancellation or timeout.
610 ///
611 /// # Errors
612 /// Returns [`HttpErrorKind::Cancelled`] on cancellation,
613 /// [`HttpErrorKind::WriteTimeout`] on timeout, or propagates the future's own
614 /// error.
615 async fn await_pre_send_future<T, F>(
616 future: F,
617 write_timeout: Duration,
618 cancellation_token: Option<CancellationToken>,
619 method: &Method,
620 request_url: Option<&Url>,
621 cancellation_message: &str,
622 timeout_message: String,
623 ) -> HttpResult<T>
624 where
625 F: Future<Output = HttpResult<T>>,
626 {
627 let timed = tokio::time::timeout(write_timeout, future);
628 let next = if let Some(token) = cancellation_token.as_ref() {
629 tokio::select! {
630 _ = token.cancelled() => {
631 return Err(Self::pre_send_cancelled_error(
632 cancellation_message,
633 method,
634 request_url,
635 ));
636 }
637 result = timed => result,
638 }
639 } else {
640 timed.await
641 };
642
643 match next {
644 Ok(result) => result,
645 Err(_) => Err(Self::pre_send_write_timeout_error(
646 timeout_message,
647 method,
648 request_url,
649 )),
650 }
651 }
652
653 /// Builds a cancellation error for pre-send preparation.
654 ///
655 /// # Parameters
656 /// - `message`: Cancellation message.
657 /// - `method`: Request method for context.
658 /// - `request_url`: Optional request URL for context.
659 ///
660 /// # Returns
661 /// Cancellation [`HttpError`] with request context attached.
662 fn pre_send_cancelled_error(
663 message: &str,
664 method: &Method,
665 request_url: Option<&Url>,
666 ) -> HttpError {
667 let mut error = HttpError::cancelled(message).with_method(method);
668 if let Some(request_url) = request_url {
669 error = error.with_url(request_url);
670 }
671 error
672 }
673
674 /// Builds a write-timeout error for pre-send preparation.
675 ///
676 /// # Parameters
677 /// - `message`: Timeout message.
678 /// - `method`: Request method for context.
679 /// - `request_url`: Optional request URL for context.
680 ///
681 /// # Returns
682 /// Write-timeout [`HttpError`] with request context attached.
683 fn pre_send_write_timeout_error(
684 message: String,
685 method: &Method,
686 request_url: Option<&Url>,
687 ) -> HttpError {
688 let mut error = HttpError::write_timeout(message).with_method(method);
689 if let Some(request_url) = request_url {
690 error = error.with_url(request_url);
691 }
692 error
693 }
694
695 /// Returns the resolved URL for current request fields, computing and
696 /// caching it on demand.
697 ///
698 /// # Returns
699 /// Resolved [`Url`] value (cloned from cache when already computed).
700 ///
701 /// # Errors
702 /// Returns [`HttpError::invalid_url`] when parsing fails, the base URL is
703 /// missing for a relative path, joining fails, or [`Self::ipv4_only`]
704 /// rejects an IPv6 literal host.
705 pub(crate) fn resolved_url(&self) -> Result<Url, HttpError> {
706 let cached = match self.resolved_url.read() {
707 Ok(guard) => guard.clone(),
708 Err(_) => return Err(HttpError::other("Resolved URL cache read lock poisoned")),
709 };
710 if let Some(url) = cached.as_ref() {
711 return Ok(url.clone());
712 }
713 let resolved = self.compute_resolved_url()?;
714 match self.resolved_url.write() {
715 Ok(mut guard) => *guard = Some(resolved.clone()),
716 Err(_) => return Err(HttpError::other("Resolved URL cache write lock poisoned")),
717 }
718 Ok(resolved)
719 }
720
721 /// Returns the resolved URL plus request-builder query parameters.
722 ///
723 /// This mirrors the URL sent by [`Self::send_impl`]: query pairs already
724 /// present in [`Self::path`] are preserved, and pairs from
725 /// [`Self::query`] are appended in insertion order.
726 ///
727 /// # Returns
728 /// Resolved [`Url`] including query parameters from this request snapshot.
729 ///
730 /// # Errors
731 /// Propagates [`Self::resolved_url`] errors for invalid or unresolved URLs.
732 pub(crate) fn resolved_url_with_query(&self) -> Result<Url, HttpError> {
733 let mut url = self.resolved_url()?;
734 if !self.query.is_empty() {
735 {
736 let mut pairs = url.query_pairs_mut();
737 for (key, value) in &self.query {
738 pairs.append_pair(key, value);
739 }
740 }
741 }
742 Ok(url)
743 }
744
745 /// Returns cached resolved URL when available.
746 pub fn resolved_url_cached(&self) -> Option<Url> {
747 self.resolved_url
748 .read()
749 .map(|guard| guard.clone())
750 .unwrap_or_default()
751 }
752
753 /// Recomputes and stores the current resolved URL.
754 fn refresh_resolved_url_cache(&mut self) {
755 if let Ok(mut guard) = self.resolved_url.write() {
756 *guard = self.compute_resolved_url().ok();
757 }
758 }
759
760 /// Computes the resolved URL from current path/base/ipv4 settings.
761 fn compute_resolved_url(&self) -> Result<Url, HttpError> {
762 if let Ok(url) = Url::parse(&self.path) {
763 self.validate_resolved_url_host(&url)?;
764 return Ok(url);
765 }
766
767 let base = self.context.base_url.as_ref().ok_or_else(|| {
768 HttpError::invalid_url(format!(
769 "Cannot resolve relative path '{}' without base_url",
770 self.path
771 ))
772 })?;
773
774 let url = base.join(&self.path).map_err(|error| {
775 HttpError::invalid_url(format!(
776 "Failed to resolve path '{}' against base URL '{}': {}",
777 self.path, base, error
778 ))
779 })?;
780 self.validate_resolved_url_host(&url)?;
781 Ok(url)
782 }
783
784 /// Enforces [`Self::ipv4_only`] by rejecting IPv6 literal hosts in `url`.
785 ///
786 /// # Parameters
787 /// - `url`: Candidate URL after parsing or joining.
788 ///
789 /// # Returns
790 /// `Ok(())` when the host is acceptable.
791 ///
792 /// # Errors
793 /// [`HttpError::invalid_url`] when `ipv4_only` is `true` and the host is an
794 /// IPv6 literal.
795 fn validate_resolved_url_host(&self, url: &Url) -> Result<(), HttpError> {
796 if self.context.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
797 return Err(HttpError::invalid_url(format!(
798 "IPv6 literal host is not allowed when ipv4_only=true: {}",
799 url
800 )));
801 }
802 Ok(())
803 }
804
805 /// Returns the attempt-scoped merged outbound headers.
806 ///
807 /// On first call after invalidation, this computes merged headers by
808 /// replaying defaults/injectors/request-local headers and stores them in
809 /// [`Self::effective_headers`]. Later calls in the same attempt return the
810 /// cached map.
811 ///
812 /// Why this API is async:
813 /// - async injectors are part of header assembly and may perform awaitable
814 /// work (for example token refresh or other I/O-backed value resolution).
815 /// - therefore header materialization cannot be fully synchronous.
816 ///
817 /// Merge order (later wins on duplicates):
818 /// 1. Client default headers snapshot captured when the builder was
819 /// created.
820 /// 2. Synchronous injector output in registration order.
821 /// 3. Asynchronous injector output in registration order.
822 /// 4. Request-local headers from this snapshot.
823 ///
824 /// # Returns
825 /// Borrowed merged [`HeaderMap`] from the cache.
826 ///
827 /// # Errors
828 /// Propagates failures returned by any injector's `apply` implementation.
829 pub(crate) async fn effective_headers(&mut self) -> HttpResult<&HeaderMap> {
830 if self.effective_headers.is_none() {
831 self.effective_headers = Some(self.compute_effective_headers().await?);
832 }
833 Ok(self
834 .effective_headers
835 .as_ref()
836 .expect("effective headers cache must be populated after computation"))
837 }
838
839 /// Returns cached merged outbound headers when available.
840 pub(crate) fn effective_headers_cached(&self) -> Option<&HeaderMap> {
841 self.effective_headers.as_ref()
842 }
843
844 /// Clears the effective-header cache.
845 ///
846 /// This method invalidates [`Self::effective_headers`] so the next call to
847 /// [`Self::effective_headers`] recomputes merged headers by re-running
848 /// defaults and header injectors.
849 ///
850 /// Why this is needed:
851 /// - request-local headers may have been mutated (`set_header`, `clear_headers`, etc.);
852 /// - injector output may be time-sensitive (for example rotating auth token
853 /// or timestamp-based signatures), so each send attempt should recompute
854 /// merged headers instead of reusing stale values from prior attempts.
855 ///
856 /// When to call:
857 /// - immediately before starting a new send attempt;
858 /// - after any mutation that can change final outbound headers.
859 pub(crate) fn invalidate_effective_headers_cache(&mut self) {
860 self.effective_headers = None;
861 }
862
863 /// Computes merged outbound headers without touching the cache.
864 async fn compute_effective_headers(&self) -> HttpResult<HeaderMap> {
865 let mut headers = self.context.default_headers.clone();
866
867 for injector in &self.context.injectors {
868 injector.apply(&mut headers)?;
869 }
870 for injector in &self.context.async_injectors {
871 injector.apply(&mut headers).await?;
872 }
873
874 headers.extend(self.headers.clone());
875 Ok(headers)
876 }
877
878 /// Returns a pre-cancelled [`HttpError`] when a token is present and
879 /// already cancelled.
880 ///
881 /// # Parameters
882 /// - `message`: Human-readable cancellation reason.
883 ///
884 /// # Returns
885 /// `Some` [`HttpError`] (including method context and cached URL when
886 /// available) when a token exists and is already cancelled; otherwise
887 /// `None`.
888 pub(crate) fn cancelled_error_if_needed(&self, message: &str) -> Option<HttpError> {
889 if self
890 .execution_options
891 .cancellation_token
892 .as_ref()
893 .is_some_and(CancellationToken::is_cancelled)
894 {
895 let mut error = HttpError::cancelled(message.to_string()).with_method(&self.method);
896 if let Ok(url) = self.resolved_url_with_query() {
897 error = error.with_url(&url);
898 }
899 Some(error)
900 } else {
901 None
902 }
903 }
904
905 /// Attaches the correct reqwest body encoding for each [`HttpRequestBody`]
906 /// variant.
907 ///
908 /// # Parameters
909 /// - `builder`: Partially configured [`reqwest::RequestBuilder`]
910 /// (method/URL/headers already set).
911 /// - `body`: Payload variant to attach; moved into the builder.
912 ///
913 /// # Returns
914 /// The same builder with an appropriate `.body(...)` applied (or unchanged
915 /// for [`HttpRequestBody::Empty`]).
916 fn apply_request_body(
917 builder: reqwest::RequestBuilder,
918 body: HttpRequestBody,
919 ) -> reqwest::RequestBuilder {
920 match body {
921 HttpRequestBody::Empty => builder,
922 HttpRequestBody::Bytes(bytes)
923 | HttpRequestBody::Json(bytes)
924 | HttpRequestBody::Form(bytes)
925 | HttpRequestBody::Multipart(bytes)
926 | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
927 HttpRequestBody::Stream(chunks) => {
928 let body_stream = futures_stream::iter(
929 chunks.into_iter().map(Result::<Bytes, std::io::Error>::Ok),
930 );
931 builder.body(reqwest::Body::wrap_stream(body_stream))
932 }
933 HttpRequestBody::Text(text) => builder.body(text),
934 }
935 }
936}
937
938impl Clone for HttpRequest {
939 fn clone(&self) -> Self {
940 Self {
941 method: self.method.clone(),
942 path: self.path.clone(),
943 query: self.query.clone(),
944 headers: self.headers.clone(),
945 body: self.body.clone(),
946 streaming_body: self.streaming_body.clone(),
947 resolved_url: RwLock::new(self.resolved_url_cached()),
948 effective_headers: self.effective_headers.clone(),
949 execution_options: self.execution_options.clone(),
950 context: self.context.clone(),
951 }
952 }
953}