Skip to main content

qubit_http/client/
http_client.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! HTTP client: builds requests, applies defaults and interceptors, executes
11//! them with optional retry, and exposes SSE helpers with reconnect.
12//!
13//! Single-shot execution is [`HttpClient::execute`] / [`HttpClient::execute_once`];
14//! retry policy comes from [`crate::HttpClientOptions::retry`] unless overridden
15//! per request.
16//!
17
18use std::time::{Duration, Instant};
19
20use qubit_retry::{
21    AttemptFailure, AttemptFailureDecision, Retry, RetryContext, RetryError, RetryErrorReason,
22};
23
24use crate::sse::SseReconnectRunner;
25use crate::{
26    response::HttpResponseOptions,
27    sse::{SseEventStream, SseReconnectOptions},
28    AsyncHttpHeaderInjector, HttpClientOptions, HttpError, HttpHeaderInjector, HttpLogger,
29    HttpRequest, HttpRequestBuilder, HttpRequestInterceptor, HttpRequestInterceptors, HttpResponse,
30    HttpResponseInterceptor, HttpResponseInterceptors, HttpResponseMeta, HttpResult,
31    HttpRetryOptions,
32};
33
34/// High-level HTTP client: default headers, injectors, interceptors, logging,
35/// timeouts, and optional per-request retry.
36///
37/// [`Clone`] is shallow and cheap enough for typical use (including passing into
38/// retry closures); cloning does not duplicate the underlying connection pool
39/// beyond what [`reqwest::Client`] already shares.
40#[derive(Clone)]
41pub struct HttpClient {
42    /// Pluggable low-level HTTP stack used to send requests (currently reqwest).
43    pub(super) backend: reqwest::Client,
44    /// Timeouts, proxy, logging, default headers, and related settings.
45    pub(super) options: HttpClientOptions,
46    /// Header injectors applied to every outgoing request after default
47    /// headers.
48    pub(super) injectors: Vec<HttpHeaderInjector>,
49    /// Async header injectors applied after sync injectors and before request-level headers.
50    pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
51    /// Request interceptors applied before request send for each attempt.
52    request_interceptors: HttpRequestInterceptors,
53    /// Response interceptors applied on successful responses before return.
54    response_interceptors: HttpResponseInterceptors,
55}
56
57impl HttpClient {
58    /// Wraps a built [`reqwest::Client`] with the given options and an empty
59    /// injector list.
60    ///
61    /// # Parameters
62    /// - `backend`: Configured low-level HTTP client used for I/O.
63    /// - `options`: Client-wide timeouts, headers, proxy, logging, etc.
64    ///
65    /// # Returns
66    /// A new [`HttpClient`] with no injectors until
67    /// [`HttpClient::add_header_injector`] is called.
68    pub(crate) fn new(backend: reqwest::Client, options: HttpClientOptions) -> Self {
69        Self {
70            backend,
71            options,
72            injectors: Vec::new(),
73            async_injectors: Vec::new(),
74            request_interceptors: HttpRequestInterceptors::new(),
75            response_interceptors: HttpResponseInterceptors::new(),
76        }
77    }
78
79    /// Returns a reference to the client-wide options (timeouts, proxy, logging,
80    /// default headers, retry defaults, etc.).
81    ///
82    /// # Returns
83    /// Immutable borrow of [`HttpClientOptions`]. Never `None`; always the
84    /// options installed on this client.
85    pub fn options(&self) -> &HttpClientOptions {
86        &self.options
87    }
88
89    /// Appends a [`HttpHeaderInjector`] so its mutation function runs on every
90    /// request. Mutates `self` in place.
91    ///
92    /// # Parameters
93    /// - `injector`: Injector to append (order is preserved).
94    pub fn add_header_injector(&mut self, injector: HttpHeaderInjector) {
95        self.injectors.push(injector);
96    }
97
98    /// Appends an async header injector whose mutation runs after sync injectors.
99    /// Mutates `self` in place.
100    ///
101    /// # Parameters
102    /// - `injector`: Async injector to append (order is preserved).
103    pub fn add_async_header_injector(&mut self, injector: AsyncHttpHeaderInjector) {
104        self.async_injectors.push(injector);
105    }
106
107    /// Appends a request interceptor run before each send attempt (including
108    /// each retry attempt). Mutates `self` in place.
109    ///
110    /// # Parameters
111    /// - `interceptor`: Request interceptor to append (order is preserved).
112    pub fn add_request_interceptor(&mut self, interceptor: HttpRequestInterceptor) {
113        self.request_interceptors.push(interceptor);
114    }
115
116    /// Appends a response interceptor run only after a successful HTTP status
117    /// (after the internal `execute_once` step) and before response body logging.
118    /// Mutates `self` in place.
119    ///
120    /// # Parameters
121    /// - `interceptor`: Response interceptor to append (order is preserved).
122    pub fn add_response_interceptor(&mut self, interceptor: HttpResponseInterceptor) {
123        self.response_interceptors.push(interceptor);
124    }
125
126    /// Validates and adds one client-level default header.
127    ///
128    /// The header is applied to every request before header injectors and
129    /// request-level headers.
130    ///
131    /// # Parameters
132    /// - `name`: Header name.
133    /// - `value`: Header value.
134    ///
135    /// # Returns
136    /// `Ok(self)` after the header is stored.
137    ///
138    /// # Errors
139    /// Returns [`HttpError`] when the header name or value is invalid.
140    pub fn add_header(&mut self, name: &str, value: &str) -> HttpResult<&mut Self> {
141        self.options.add_header(name, value)?;
142        Ok(self)
143    }
144
145    /// Validates and adds many client-level default headers atomically.
146    ///
147    /// If any input pair is invalid, no header from this batch is applied.
148    ///
149    /// # Parameters
150    /// - `headers`: Iterator of `(name, value)` pairs.
151    ///
152    /// # Returns
153    /// `Ok(self)` after all headers are stored.
154    ///
155    /// # Errors
156    /// Returns [`HttpError`] when any name/value pair is invalid (nothing from
157    /// this call is applied).
158    pub fn add_headers(&mut self, headers: &[(&str, &str)]) -> HttpResult<&mut Self> {
159        self.options.add_headers(headers)?;
160        Ok(self)
161    }
162
163    /// Clears all synchronous header injectors. Mutates `self` in place.
164    pub fn clear_header_injectors(&mut self) {
165        self.injectors.clear();
166    }
167
168    /// Clears all async header injectors. Mutates `self` in place.
169    pub fn clear_async_header_injectors(&mut self) {
170        self.async_injectors.clear();
171    }
172
173    /// Clears all request interceptors. Mutates `self` in place.
174    pub fn clear_request_interceptors(&mut self) {
175        self.request_interceptors.clear();
176    }
177
178    /// Clears all response interceptors. Mutates `self` in place.
179    pub fn clear_response_interceptors(&mut self) {
180        self.response_interceptors.clear();
181    }
182
183    /// Starts building an [`HttpRequest`] with the given method and path
184    /// (relative or absolute URL string).
185    ///
186    /// # Parameters
187    /// - `method`: HTTP verb (GET, POST, …).
188    /// - `path`: Path relative to [`HttpClientOptions::base_url`] or a full URL
189    ///   string.
190    ///
191    /// # Returns
192    /// A new [`HttpRequestBuilder`] borrowing this client for defaults; it is
193    /// not sent until built and passed to [`HttpClient::execute`] (or related
194    /// APIs).
195    pub fn request(&self, method: http::Method, path: &str) -> HttpRequestBuilder {
196        HttpRequestBuilder::new(method, path, self)
197    }
198
199    /// Returns a clone of the client-level default header map.
200    ///
201    /// Used when constructing a built [`HttpRequest`] so the snapshot reflects
202    /// headers at build time.
203    ///
204    /// # Returns
205    /// Owned [`http::HeaderMap`] copy of [`HttpClientOptions`] default headers.
206    pub(crate) fn headers_snapshot(&self) -> http::HeaderMap {
207        self.options.default_headers.clone()
208    }
209
210    /// Returns a clone of the registered synchronous header injectors list.
211    ///
212    /// # Returns
213    /// New [`Vec`] with the same injectors and order as on this client.
214    pub(crate) fn injectors_snapshot(&self) -> Vec<HttpHeaderInjector> {
215        self.injectors.clone()
216    }
217
218    /// Returns a clone of the registered async header injectors list.
219    ///
220    /// # Returns
221    /// New [`Vec`] with the same injectors and order as on this client.
222    pub(crate) fn async_injectors_snapshot(&self) -> Vec<AsyncHttpHeaderInjector> {
223        self.async_injectors.clone()
224    }
225
226    /// Sends the request and returns a unified [`HttpResponse`].
227    ///
228    /// Chooses retry vs single attempt from resolved [`HttpRetryOptions`] for
229    /// this request. Performs network I/O and may await the internal
230    /// `execute_once` path
231    /// multiple times with backoff between attempts when retry is enabled.
232    ///
233    /// # Parameters
234    /// - `request`: Built request (URL resolved against `base_url` if path is
235    ///   not absolute).
236    ///
237    /// # Returns
238    /// - `Ok(HttpResponse)` when the HTTP status is success
239    ///   ([`http::StatusCode::is_success`]).
240    /// - `Err(HttpError)` when any attempt fails for URL/header validation,
241    ///   cancellation, interceptor failure, transport/timeout, non-success
242    ///   status, or when the retry executor aborts or exceeds limits.
243    pub async fn execute(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
244        let retry_options = self.options.retry.resolve(&request);
245        if retry_options.should_retry(&request) {
246            self.execute_with_retry(request, retry_options).await
247        } else {
248            self.execute_once(request).await
249        }
250    }
251
252    /// Performs one non-retrying execution: pre-send cancellation check,
253    /// request interceptors, resolve URL, merge headers, log the request, send
254    /// with configured timeouts, map non-success status to an error, then
255    /// response interceptors and response logging. The returned body is read
256    /// lazily according to [`HttpResponse`].
257    ///
258    /// # Parameters
259    /// - `request`: Built request to send (same fields as for
260    ///   [`HttpClient::execute`]).
261    ///
262    /// # Returns
263    /// - `Ok(HttpResponse)` on success status and after interceptors/logging
264    ///   steps succeed.
265    /// - `Err(HttpError)` from request/response interceptors, cancellation,
266    ///   send/transport errors, status mapping, URL resolution for the response
267    ///   wrapper, or response logging failures.
268    ///
269    /// # Side effects
270    /// Network I/O, optional logging, and user-provided interceptor callbacks.
271    pub(crate) async fn execute_once(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
272        let mut request = request;
273        if let Some(error) = request.cancelled_error_if_needed("Request cancelled before sending") {
274            return Err(error);
275        }
276        self.request_interceptors.apply(&mut request)?;
277        let response = self
278            .prepare_and_send_once(request, "Request cancelled before sending")
279            .await?;
280        let mut response = response
281            .into_success_or_status_error("HTTP request failed")
282            .await?;
283        self.response_interceptors.apply(&mut response.meta)?;
284        let logger = HttpLogger::new(&self.options);
285        logger.log_response(&mut response).await?;
286        Ok(response)
287    }
288
289    /// Single low-level send: cancellation check, request logging, one backend
290    /// round-trip, then wraps the backend response as [`HttpResponse`].
291    ///
292    /// Does not run response interceptors or success-status enforcement; those
293    /// happen in [`HttpClient::execute_once`] after this returns.
294    ///
295    /// # Parameters
296    /// - `request`: Request to send (may be mutated for logging/send path).
297    /// - `cancellation_message`: Message embedded if the request is already
298    ///   cancelled when this runs.
299    ///
300    /// # Returns
301    /// - `Ok(HttpResponse)` with lazy body and metadata.
302    /// - `Err(HttpError)` if cancelled before send, send fails, or
303    ///   [`HttpRequest::resolved_url`] fails when building the wrapper.
304    ///
305    /// # Side effects
306    /// Async network I/O and request logging via [`HttpLogger`].
307    async fn prepare_and_send_once(
308        &self,
309        request: HttpRequest,
310        cancellation_message: &str,
311    ) -> HttpResult<HttpResponse> {
312        let mut request = request;
313        if let Some(error) = request.cancelled_error_if_needed(cancellation_message) {
314            return Err(error);
315        }
316        let logger = HttpLogger::new(&self.options);
317        let backend_response = request.send_impl(&self.backend, &logger).await?;
318        let meta = HttpResponseMeta::new(
319            backend_response.status(),
320            backend_response.headers().clone(),
321            backend_response.url().clone(),
322            request.method().clone(),
323        );
324        let response_options = HttpResponseOptions::new(
325            self.options.error_response_preview_limit,
326            self.options.sse_json_mode,
327            self.options.sse_max_line_bytes,
328            self.options.sse_max_frame_bytes,
329            self.options.sse_done_marker_policy.clone(),
330        );
331        Ok(HttpResponse::from_backend(
332            meta,
333            backend_response,
334            request.read_timeout(),
335            request.cancellation_token().cloned(),
336            request.resolved_url_with_query()?,
337            response_options,
338        ))
339    }
340
341    /// Runs [`HttpClient::execute_once`] under the given retry policy.
342    ///
343    /// Between attempts waits according to the resolved retry delay, optionally
344    /// honoring `Retry-After` by extending the next sleep. Each attempt clones
345    /// the request so request bodies can be rebuilt when supported.
346    ///
347    /// # Parameters
348    /// - `request`: Built request passed to each [`HttpClient::execute_once`]
349    ///   attempt (cloned per retry closure).
350    /// - `options`: Effective retry options for this request (from resolution
351    ///   in [`HttpClient::execute`]).
352    ///
353    /// # Returns
354    /// - `Ok(HttpResponse)` when an attempt completes with success status.
355    /// - `Err(HttpError)` from retry option validation, from any
356    ///   [`HttpClient::execute_once`] failure that is non-retryable, or from
357    ///   retry exhaustion/max-duration enforcement.
358    ///
359    /// # Side effects
360    /// Multiple async HTTP attempts and optional sleeps.
361    async fn execute_with_retry(
362        &self,
363        request: HttpRequest,
364        options: HttpRetryOptions,
365    ) -> HttpResult<HttpResponse> {
366        let honor_retry_after = request.retry_override().should_honor_retry_after();
367        let retry_options = options.to_executor_options()?;
368        let started_at = Instant::now();
369
370        let retry_policy_options = options.clone();
371        let retry_delay_options = retry_options.clone();
372        let retry_policy = match Retry::<HttpError>::builder()
373            .options(retry_options)
374            .retry_after_from_error(move |error| {
375                honor_retry_after.then_some(error.retry_after).flatten()
376            })
377            .on_failure(
378                move |failure: &AttemptFailure<HttpError>, context: &RetryContext| {
379                    Self::retry_failure_decision(
380                        failure,
381                        context,
382                        &retry_policy_options,
383                        &retry_delay_options,
384                    )
385                },
386            )
387            .build()
388        {
389            Ok(retry_policy) => retry_policy,
390            Err(error) => {
391                return Err(HttpError::other(format!(
392                    "Invalid HTTP retry executor: {error}"
393                )))
394            }
395        };
396
397        let cancellation_token = request.cancellation_token().cloned();
398        let request_method = request.method().clone();
399        let request_url = request.resolved_url_with_query().ok();
400        let retry_request = request.clone();
401        let retry_future = retry_policy.run_async(|| {
402            let attempt_request = retry_request.clone();
403            async move { self.execute_once(attempt_request).await }
404        });
405
406        let retry_result = if let Some(token) = cancellation_token.as_ref() {
407            tokio::select! {
408                _ = token.cancelled() => {
409                    return Err(Self::retry_cancelled_error(
410                        "HTTP retry cancelled while waiting before next attempt",
411                        &request_method,
412                        request_url.as_ref(),
413                    ));
414                }
415                result = retry_future => result,
416            }
417        } else {
418            retry_future.await
419        };
420
421        match retry_result {
422            Ok(response) => Ok(response),
423            Err(error) => Err(Self::map_retry_error(
424                error,
425                started_at,
426                options.max_duration,
427                options.max_attempts,
428            )),
429        }
430    }
431
432    /// Returns whether `error` is retryable under `options`.
433    ///
434    /// # Parameters
435    /// - `error`: Error produced by a single HTTP attempt.
436    /// - `options`: Effective retry options for the request.
437    ///
438    /// # Returns
439    /// `true` if another attempt may be scheduled.
440    fn is_retryable_error(error: &HttpError, options: &HttpRetryOptions) -> bool {
441        if error.kind == crate::HttpErrorKind::Status {
442            error
443                .status
444                .is_some_and(|status| options.is_retryable_status(status))
445        } else {
446            options.is_retryable_error_kind(error.kind)
447        }
448    }
449
450    /// Computes the sleep before the next retry attempt.
451    ///
452    /// # Parameters
453    /// - `base_delay`: Delay selected from retry policy and jitter.
454    /// - `retry_after_hint`: Retry-After delay extracted by the retry policy.
455    ///
456    /// # Returns
457    /// `base_delay`, or the larger `Retry-After` value when present.
458    fn retry_sleep_delay(base_delay: Duration, retry_after_hint: Option<Duration>) -> Duration {
459        retry_after_hint
460            .map(|retry_after| retry_after.max(base_delay))
461            .unwrap_or(base_delay)
462    }
463
464    /// Decides how HTTP retry should handle one failed attempt.
465    ///
466    /// # Parameters
467    /// - `failure`: Failed attempt reported by `qubit-retry`.
468    /// - `context`: Retry context for the failed attempt.
469    /// - `policy_options`: HTTP retry allowlists and method policy.
470    /// - `delay_options`: Retry executor options used to calculate base delay.
471    ///
472    /// # Returns
473    /// Decision for `qubit-retry`: abort non-retryable HTTP errors, otherwise
474    /// retry after the larger base delay / Retry-After hint. Non-HTTP runtime
475    /// failures fall back to the retry executor default.
476    fn retry_failure_decision(
477        failure: &AttemptFailure<HttpError>,
478        context: &RetryContext,
479        policy_options: &HttpRetryOptions,
480        delay_options: &qubit_retry::RetryOptions,
481    ) -> AttemptFailureDecision {
482        let Some(error) = failure.as_error() else {
483            return AttemptFailureDecision::UseDefault;
484        };
485        if !Self::is_retryable_error(error, policy_options) {
486            return AttemptFailureDecision::Abort;
487        }
488
489        let base_delay = delay_options.delay_for_attempt(context.attempt());
490        let sleep_delay = Self::retry_sleep_delay(base_delay, context.retry_after_hint());
491        AttemptFailureDecision::RetryAfter(sleep_delay)
492    }
493
494    /// Exposes retry failure decisions to coverage-only integration tests.
495    ///
496    /// # Parameters
497    /// - `failure`: Failed attempt to inspect.
498    /// - `context`: Retry context for the failed attempt.
499    /// - `policy_options`: HTTP retry policy options.
500    /// - `delay_options`: Retry executor options.
501    ///
502    /// # Returns
503    /// The retry decision selected by [`Self::retry_failure_decision`].
504    #[cfg(coverage)]
505    #[doc(hidden)]
506    pub(crate) fn coverage_retry_failure_decision(
507        failure: &AttemptFailure<HttpError>,
508        context: &RetryContext,
509        policy_options: &HttpRetryOptions,
510        delay_options: &qubit_retry::RetryOptions,
511    ) -> AttemptFailureDecision {
512        Self::retry_failure_decision(failure, context, policy_options, delay_options)
513    }
514
515    /// Adds retry-attempt exhaustion context to the last attempt error.
516    ///
517    /// # Parameters
518    /// - `error`: Last retryable attempt error.
519    /// - `attempts`: Number of attempts already made.
520    /// - `max_attempts`: Configured maximum attempts.
521    ///
522    /// # Returns
523    /// The same error with retry exhaustion details appended to its message.
524    fn map_retry_attempts_exhausted(
525        mut error: HttpError,
526        attempts: u32,
527        max_attempts: u32,
528    ) -> HttpError {
529        error.message = format!(
530            "{} (retry attempts exhausted: {attempts}/{max_attempts})",
531            error.message
532        );
533        error
534    }
535
536    /// Builds the error returned when retry policy stops early.
537    ///
538    /// # Parameters
539    /// - `error`: Attempt error that the retry policy chose not to retry.
540    /// - `attempts`: Number of attempts already made.
541    /// - `started_at`: Start instant of the retry flow.
542    ///
543    /// # Returns
544    /// [`HttpError::retry_aborted`] with the original [`HttpError`] chained as
545    /// source for callers that need the underlying status or transport error.
546    fn map_retry_aborted(error: HttpError, attempts: u32, started_at: Instant) -> HttpError {
547        let elapsed = started_at.elapsed();
548        let summary = error.message.clone();
549        HttpError::retry_aborted(format!(
550            "HTTP retry aborted after {attempts} attempt(s) in {elapsed:?}: {summary}"
551        ))
552        .with_source(error)
553    }
554
555    /// Builds the error when retry max-duration is exhausted.
556    ///
557    /// # Parameters
558    /// - `started_at`: Start instant of the retry flow.
559    /// - `max_duration`: Configured max-duration budget.
560    /// - `last_error`: Last captured retryable attempt error, if any.
561    ///
562    /// # Returns
563    /// Augments the last failure when present, otherwise a dedicated
564    /// max-duration error with no underlying attempt error.
565    fn map_retry_max_duration_exceeded(
566        started_at: Instant,
567        max_duration: Option<Duration>,
568        last_error: Option<HttpError>,
569    ) -> HttpError {
570        let elapsed = started_at.elapsed();
571        let max_duration_text = max_duration
572            .map(|duration| format!("{duration:?}"))
573            .unwrap_or_else(|| "unbounded".to_string());
574        match last_error {
575            Some(mut error) => {
576                error.message = format!(
577                    "{} (retry max duration exceeded: {elapsed:?}/{max_duration_text})",
578                    error.message
579                );
580                error
581            }
582            None => HttpError::retry_max_elapsed_exceeded(format!(
583                "HTTP retry max duration exceeded before a retryable error was captured: {elapsed:?}/{max_duration_text}"
584            )),
585        }
586    }
587
588    /// Maps a [`qubit_retry::RetryError`] into this crate's HTTP error model.
589    ///
590    /// # Parameters
591    /// - `error`: Terminal retry error from `qubit-retry`.
592    /// - `started_at`: Monotonic start instant of the HTTP retry flow.
593    /// - `max_duration`: Optional HTTP total retry budget.
594    /// - `max_attempts`: Configured maximum HTTP attempts.
595    ///
596    /// # Returns
597    /// A rich [`HttpError`] preserving the last attempt error when available.
598    fn map_retry_error(
599        error: RetryError<HttpError>,
600        started_at: Instant,
601        max_duration: Option<Duration>,
602        max_attempts: u32,
603    ) -> HttpError {
604        let attempts = error.attempts();
605        let reason = error.reason();
606        let (_, last_failure, _) = error.into_parts();
607        let last_error = last_failure.and_then(AttemptFailure::into_error);
608
609        match reason {
610            RetryErrorReason::AttemptsExceeded => last_error
611                .map(|error| Self::map_retry_attempts_exhausted(error, attempts, max_attempts))
612                .unwrap_or_else(|| {
613                    HttpError::retry_aborted(format!(
614                        "HTTP retry attempts exhausted without a captured HTTP error: {attempts}/{max_attempts}"
615                    ))
616                }),
617            RetryErrorReason::MaxOperationElapsedExceeded
618            | RetryErrorReason::MaxTotalElapsedExceeded => {
619                Self::map_retry_max_duration_exceeded(started_at, max_duration, last_error)
620            }
621            RetryErrorReason::Aborted => match last_error {
622                Some(error) if error.kind == crate::HttpErrorKind::Cancelled => error,
623                Some(error) => Self::map_retry_aborted(error, attempts, started_at),
624                None => HttpError::retry_aborted(format!(
625                    "HTTP retry aborted after {attempts} attempt(s) without a captured HTTP error"
626                )),
627            },
628            RetryErrorReason::UnsupportedOperation | RetryErrorReason::WorkerStillRunning => {
629                HttpError::other(format!(
630                    "HTTP retry executor failed after {attempts} attempt(s): {reason:?}"
631                ))
632            }
633        }
634    }
635
636    /// Exposes retry error mapping to coverage-only integration tests.
637    ///
638    /// # Parameters
639    /// - `error`: Synthetic terminal retry error.
640    /// - `started_at`: Monotonic retry-flow start instant.
641    /// - `max_duration`: Optional HTTP max-duration budget.
642    /// - `max_attempts`: Configured maximum attempts.
643    ///
644    /// # Returns
645    /// HTTP error mapped by [`Self::map_retry_error`].
646    #[cfg(coverage)]
647    #[doc(hidden)]
648    pub(crate) fn coverage_map_retry_error(
649        error: RetryError<HttpError>,
650        started_at: Instant,
651        max_duration: Option<Duration>,
652        max_attempts: u32,
653    ) -> HttpError {
654        Self::map_retry_error(error, started_at, max_duration, max_attempts)
655    }
656
657    /// Exercises the low-level pre-send cancellation check for coverage tests.
658    ///
659    /// # Returns
660    /// Error kind returned before any network I/O starts.
661    #[cfg(coverage)]
662    #[doc(hidden)]
663    pub(crate) async fn coverage_prepare_cancelled_error() -> crate::HttpErrorKind {
664        let client = crate::HttpClientFactory::new()
665            .create_default()
666            .expect("coverage HTTP client should build");
667        let token = tokio_util::sync::CancellationToken::new();
668        token.cancel();
669        let mut request = client
670            .request(
671                http::Method::GET,
672                "https://example.com/cancelled-before-send",
673            )
674            .build();
675        request.set_cancellation_token(token);
676
677        client
678            .prepare_and_send_once(request, "coverage request cancelled before send")
679            .await
680            .expect_err("pre-cancelled request should fail before send")
681            .kind
682    }
683
684    /// Builds a cancellation error for retry wait cancellation.
685    ///
686    /// # Parameters
687    /// - `message`: Human-readable cancellation reason.
688    /// - `method`: Request method to attach.
689    /// - `url`: Optional resolved request URL to attach.
690    ///
691    /// # Returns
692    /// [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled) with request
693    /// context.
694    fn retry_cancelled_error(
695        message: &str,
696        method: &http::Method,
697        url: Option<&url::Url>,
698    ) -> HttpError {
699        let mut error = HttpError::cancelled(message).with_method(method);
700        if let Some(url) = url {
701            error = error.with_url(url);
702        }
703        error
704    }
705
706    /// Opens an SSE stream and reconnects automatically on retryable stream
707    /// failures.
708    ///
709    /// Reconnect behavior:
710    /// - retryable transport/read failures trigger reconnects;
711    /// - optional reconnect on clean EOF (`reconnect_on_eof`);
712    /// - `Last-Event-ID` is set from the latest parsed SSE `id:` field;
713    /// - optional use of SSE `retry:` as next reconnect delay.
714    ///
715    /// # Parameters
716    /// - `request`: SSE request template reused on reconnect.
717    /// - `options`: Reconnect limits and delay policy.
718    ///
719    /// # Returns
720    /// SSE event stream yielding events from one or more reconnect sessions.
721    ///
722    /// # Errors
723    /// Stream items are `Result`; `Err` covers per-item failures such as:
724    /// - initial stream-open failures when not reconnectable or retries exhausted;
725    /// - SSE protocol errors (non-reconnectable by default);
726    /// - transport/read errors after reconnect budget is exhausted.
727    ///
728    /// # Side effects
729    /// Performs repeated HTTP requests and reads on reconnect; may sleep between
730    /// attempts according to reconnect options.
731    pub fn execute_sse_with_reconnect(
732        &self,
733        request: HttpRequest,
734        options: SseReconnectOptions,
735    ) -> SseEventStream {
736        SseReconnectRunner::new(self.clone(), request, options).run()
737    }
738}
739
740impl std::fmt::Debug for HttpClient {
741    /// Formats the client for debugging (exposes options and injectors; omits
742    /// the backend client).
743    ///
744    /// # Parameters
745    /// - `f`: Destination formatter.
746    ///
747    /// # Returns
748    /// `fmt::Result` from writing the debug struct.
749    ///
750    /// # Errors
751    /// Returns an error if formatting to `f` fails.
752    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753        f.debug_struct("HttpClient")
754            .field("options", &self.options)
755            .field("injectors", &self.injectors)
756            .field("async_injectors", &self.async_injectors)
757            .field("request_interceptors", &self.request_interceptors)
758            .field("response_interceptors", &self.response_interceptors)
759            .finish_non_exhaustive()
760    }
761}