qubit_http/client/http_client.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! HTTP client: builds requests, applies defaults and interceptors, executes
11//! them with optional retry, and exposes SSE helpers with reconnect.
12//!
13//! Single-shot execution is [`HttpClient::execute`] / [`HttpClient::execute_once`];
14//! retry policy comes from [`crate::HttpClientOptions::retry`] unless overridden
15//! per request.
16//!
17
18use std::time::{Duration, Instant};
19
20use qubit_retry::{
21 AttemptFailure, AttemptFailureDecision, Retry, RetryContext, RetryError, RetryErrorReason,
22};
23
24use crate::sse::SseReconnectRunner;
25use crate::{
26 response::HttpResponseOptions,
27 sse::{SseEventStream, SseReconnectOptions},
28 AsyncHttpHeaderInjector, HttpClientOptions, HttpError, HttpHeaderInjector, HttpLogger,
29 HttpRequest, HttpRequestBuilder, HttpRequestInterceptor, HttpRequestInterceptors, HttpResponse,
30 HttpResponseInterceptor, HttpResponseInterceptors, HttpResponseMeta, HttpResult,
31 HttpRetryOptions,
32};
33
34/// High-level HTTP client: default headers, injectors, interceptors, logging,
35/// timeouts, and optional per-request retry.
36///
37/// [`Clone`] is shallow and cheap enough for typical use (including passing into
38/// retry closures); cloning does not duplicate the underlying connection pool
39/// beyond what [`reqwest::Client`] already shares.
40#[derive(Clone)]
41pub struct HttpClient {
42 /// Pluggable low-level HTTP stack used to send requests (currently reqwest).
43 pub(super) backend: reqwest::Client,
44 /// Timeouts, proxy, logging, default headers, and related settings.
45 pub(super) options: HttpClientOptions,
46 /// Header injectors applied to every outgoing request after default
47 /// headers.
48 pub(super) injectors: Vec<HttpHeaderInjector>,
49 /// Async header injectors applied after sync injectors and before request-level headers.
50 pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
51 /// Request interceptors applied before request send for each attempt.
52 request_interceptors: HttpRequestInterceptors,
53 /// Response interceptors applied on successful responses before return.
54 response_interceptors: HttpResponseInterceptors,
55}
56
57impl HttpClient {
58 /// Wraps a built [`reqwest::Client`] with the given options and an empty
59 /// injector list.
60 ///
61 /// # Parameters
62 /// - `backend`: Configured low-level HTTP client used for I/O.
63 /// - `options`: Client-wide timeouts, headers, proxy, logging, etc.
64 ///
65 /// # Returns
66 /// A new [`HttpClient`] with no injectors until
67 /// [`HttpClient::add_header_injector`] is called.
68 pub(crate) fn new(backend: reqwest::Client, options: HttpClientOptions) -> Self {
69 Self {
70 backend,
71 options,
72 injectors: Vec::new(),
73 async_injectors: Vec::new(),
74 request_interceptors: HttpRequestInterceptors::new(),
75 response_interceptors: HttpResponseInterceptors::new(),
76 }
77 }
78
79 /// Returns a reference to the client-wide options (timeouts, proxy, logging,
80 /// default headers, retry defaults, etc.).
81 ///
82 /// # Returns
83 /// Immutable borrow of [`HttpClientOptions`]. Never `None`; always the
84 /// options installed on this client.
85 pub fn options(&self) -> &HttpClientOptions {
86 &self.options
87 }
88
89 /// Appends a [`HttpHeaderInjector`] so its mutation function runs on every
90 /// request. Mutates `self` in place.
91 ///
92 /// # Parameters
93 /// - `injector`: Injector to append (order is preserved).
94 pub fn add_header_injector(&mut self, injector: HttpHeaderInjector) {
95 self.injectors.push(injector);
96 }
97
98 /// Appends an async header injector whose mutation runs after sync injectors.
99 /// Mutates `self` in place.
100 ///
101 /// # Parameters
102 /// - `injector`: Async injector to append (order is preserved).
103 pub fn add_async_header_injector(&mut self, injector: AsyncHttpHeaderInjector) {
104 self.async_injectors.push(injector);
105 }
106
107 /// Appends a request interceptor run before each send attempt (including
108 /// each retry attempt). Mutates `self` in place.
109 ///
110 /// # Parameters
111 /// - `interceptor`: Request interceptor to append (order is preserved).
112 pub fn add_request_interceptor(&mut self, interceptor: HttpRequestInterceptor) {
113 self.request_interceptors.push(interceptor);
114 }
115
116 /// Appends a response interceptor run only after a successful HTTP status
117 /// (after the internal `execute_once` step) and before response body logging.
118 /// Mutates `self` in place.
119 ///
120 /// # Parameters
121 /// - `interceptor`: Response interceptor to append (order is preserved).
122 pub fn add_response_interceptor(&mut self, interceptor: HttpResponseInterceptor) {
123 self.response_interceptors.push(interceptor);
124 }
125
126 /// Validates and adds one client-level default header.
127 ///
128 /// The header is applied to every request before header injectors and
129 /// request-level headers.
130 ///
131 /// # Parameters
132 /// - `name`: Header name.
133 /// - `value`: Header value.
134 ///
135 /// # Returns
136 /// `Ok(self)` after the header is stored.
137 ///
138 /// # Errors
139 /// Returns [`HttpError`] when the header name or value is invalid.
140 pub fn add_header(&mut self, name: &str, value: &str) -> HttpResult<&mut Self> {
141 self.options.add_header(name, value)?;
142 Ok(self)
143 }
144
145 /// Validates and adds many client-level default headers atomically.
146 ///
147 /// If any input pair is invalid, no header from this batch is applied.
148 ///
149 /// # Parameters
150 /// - `headers`: Iterator of `(name, value)` pairs.
151 ///
152 /// # Returns
153 /// `Ok(self)` after all headers are stored.
154 ///
155 /// # Errors
156 /// Returns [`HttpError`] when any name/value pair is invalid (nothing from
157 /// this call is applied).
158 pub fn add_headers(&mut self, headers: &[(&str, &str)]) -> HttpResult<&mut Self> {
159 self.options.add_headers(headers)?;
160 Ok(self)
161 }
162
163 /// Clears all synchronous header injectors. Mutates `self` in place.
164 pub fn clear_header_injectors(&mut self) {
165 self.injectors.clear();
166 }
167
168 /// Clears all async header injectors. Mutates `self` in place.
169 pub fn clear_async_header_injectors(&mut self) {
170 self.async_injectors.clear();
171 }
172
173 /// Clears all request interceptors. Mutates `self` in place.
174 pub fn clear_request_interceptors(&mut self) {
175 self.request_interceptors.clear();
176 }
177
178 /// Clears all response interceptors. Mutates `self` in place.
179 pub fn clear_response_interceptors(&mut self) {
180 self.response_interceptors.clear();
181 }
182
183 /// Starts building an [`HttpRequest`] with the given method and path
184 /// (relative or absolute URL string).
185 ///
186 /// # Parameters
187 /// - `method`: HTTP verb (GET, POST, …).
188 /// - `path`: Path relative to [`HttpClientOptions::base_url`] or a full URL
189 /// string.
190 ///
191 /// # Returns
192 /// A new [`HttpRequestBuilder`] borrowing this client for defaults; it is
193 /// not sent until built and passed to [`HttpClient::execute`] (or related
194 /// APIs).
195 pub fn request(&self, method: http::Method, path: &str) -> HttpRequestBuilder {
196 HttpRequestBuilder::new(method, path, self)
197 }
198
199 /// Returns a clone of the client-level default header map.
200 ///
201 /// Used when constructing a built [`HttpRequest`] so the snapshot reflects
202 /// headers at build time.
203 ///
204 /// # Returns
205 /// Owned [`http::HeaderMap`] copy of [`HttpClientOptions`] default headers.
206 pub(crate) fn headers_snapshot(&self) -> http::HeaderMap {
207 self.options.default_headers.clone()
208 }
209
210 /// Returns a clone of the registered synchronous header injectors list.
211 ///
212 /// # Returns
213 /// New [`Vec`] with the same injectors and order as on this client.
214 pub(crate) fn injectors_snapshot(&self) -> Vec<HttpHeaderInjector> {
215 self.injectors.clone()
216 }
217
218 /// Returns a clone of the registered async header injectors list.
219 ///
220 /// # Returns
221 /// New [`Vec`] with the same injectors and order as on this client.
222 pub(crate) fn async_injectors_snapshot(&self) -> Vec<AsyncHttpHeaderInjector> {
223 self.async_injectors.clone()
224 }
225
226 /// Sends the request and returns a unified [`HttpResponse`].
227 ///
228 /// Chooses retry vs single attempt from resolved [`HttpRetryOptions`] for
229 /// this request. Performs network I/O and may await the internal
230 /// `execute_once` path
231 /// multiple times with backoff between attempts when retry is enabled.
232 ///
233 /// # Parameters
234 /// - `request`: Built request (URL resolved against `base_url` if path is
235 /// not absolute).
236 ///
237 /// # Returns
238 /// - `Ok(HttpResponse)` when the HTTP status is success
239 /// ([`http::StatusCode::is_success`]).
240 /// - `Err(HttpError)` when any attempt fails for URL/header validation,
241 /// cancellation, interceptor failure, transport/timeout, non-success
242 /// status, or when the retry executor aborts or exceeds limits.
243 pub async fn execute(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
244 let retry_options = self.options.retry.resolve(&request);
245 if retry_options.should_retry(&request) {
246 self.execute_with_retry(request, retry_options).await
247 } else {
248 self.execute_once(request).await
249 }
250 }
251
252 /// Performs one non-retrying execution: pre-send cancellation check,
253 /// request interceptors, resolve URL, merge headers, log the request, send
254 /// with configured timeouts, map non-success status to an error, then
255 /// response interceptors and response logging. The returned body is read
256 /// lazily according to [`HttpResponse`].
257 ///
258 /// # Parameters
259 /// - `request`: Built request to send (same fields as for
260 /// [`HttpClient::execute`]).
261 ///
262 /// # Returns
263 /// - `Ok(HttpResponse)` on success status and after interceptors/logging
264 /// steps succeed.
265 /// - `Err(HttpError)` from request/response interceptors, cancellation,
266 /// send/transport errors, status mapping, URL resolution for the response
267 /// wrapper, or response logging failures.
268 ///
269 /// # Side effects
270 /// Network I/O, optional logging, and user-provided interceptor callbacks.
271 pub(crate) async fn execute_once(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
272 let mut request = request;
273 if let Some(error) = request.cancelled_error_if_needed("Request cancelled before sending") {
274 return Err(error);
275 }
276 self.request_interceptors.apply(&mut request)?;
277 let response = self
278 .prepare_and_send_once(request, "Request cancelled before sending")
279 .await?;
280 let mut response = response
281 .into_success_or_status_error("HTTP request failed")
282 .await?;
283 self.response_interceptors.apply(&mut response.meta)?;
284 let logger = HttpLogger::new(&self.options);
285 logger.log_response(&mut response).await?;
286 Ok(response)
287 }
288
289 /// Single low-level send: cancellation check, request logging, one backend
290 /// round-trip, then wraps the backend response as [`HttpResponse`].
291 ///
292 /// Does not run response interceptors or success-status enforcement; those
293 /// happen in [`HttpClient::execute_once`] after this returns.
294 ///
295 /// # Parameters
296 /// - `request`: Request to send (may be mutated for logging/send path).
297 /// - `cancellation_message`: Message embedded if the request is already
298 /// cancelled when this runs.
299 ///
300 /// # Returns
301 /// - `Ok(HttpResponse)` with lazy body and metadata.
302 /// - `Err(HttpError)` if cancelled before send, send fails, or
303 /// [`HttpRequest::resolved_url`] fails when building the wrapper.
304 ///
305 /// # Side effects
306 /// Async network I/O and request logging via [`HttpLogger`].
307 async fn prepare_and_send_once(
308 &self,
309 request: HttpRequest,
310 cancellation_message: &str,
311 ) -> HttpResult<HttpResponse> {
312 let mut request = request;
313 if let Some(error) = request.cancelled_error_if_needed(cancellation_message) {
314 return Err(error);
315 }
316 let logger = HttpLogger::new(&self.options);
317 let backend_response = request.send_impl(&self.backend, &logger).await?;
318 let meta = HttpResponseMeta::new(
319 backend_response.status(),
320 backend_response.headers().clone(),
321 backend_response.url().clone(),
322 request.method().clone(),
323 );
324 let response_options = HttpResponseOptions::new(
325 self.options.error_response_preview_limit,
326 self.options.sse_json_mode,
327 self.options.sse_max_line_bytes,
328 self.options.sse_max_frame_bytes,
329 self.options.sse_done_marker_policy.clone(),
330 );
331 Ok(HttpResponse::from_backend(
332 meta,
333 backend_response,
334 request.read_timeout(),
335 request.cancellation_token().cloned(),
336 request.resolved_url_with_query()?,
337 response_options,
338 ))
339 }
340
341 /// Runs [`HttpClient::execute_once`] under the given retry policy.
342 ///
343 /// Between attempts waits according to the resolved retry delay, optionally
344 /// honoring `Retry-After` by extending the next sleep. Each attempt clones
345 /// the request so request bodies can be rebuilt when supported.
346 ///
347 /// # Parameters
348 /// - `request`: Built request passed to each [`HttpClient::execute_once`]
349 /// attempt (cloned per retry closure).
350 /// - `options`: Effective retry options for this request (from resolution
351 /// in [`HttpClient::execute`]).
352 ///
353 /// # Returns
354 /// - `Ok(HttpResponse)` when an attempt completes with success status.
355 /// - `Err(HttpError)` from retry option validation, from any
356 /// [`HttpClient::execute_once`] failure that is non-retryable, or from
357 /// retry exhaustion/max-duration enforcement.
358 ///
359 /// # Side effects
360 /// Multiple async HTTP attempts and optional sleeps.
361 async fn execute_with_retry(
362 &self,
363 request: HttpRequest,
364 options: HttpRetryOptions,
365 ) -> HttpResult<HttpResponse> {
366 let honor_retry_after = request.retry_override().should_honor_retry_after();
367 let retry_options = options.to_executor_options()?;
368 let started_at = Instant::now();
369
370 let retry_policy_options = options.clone();
371 let retry_delay_options = retry_options.clone();
372 let retry_policy = match Retry::<HttpError>::builder()
373 .options(retry_options)
374 .retry_after_from_error(move |error| {
375 honor_retry_after.then_some(error.retry_after).flatten()
376 })
377 .on_failure(
378 move |failure: &AttemptFailure<HttpError>, context: &RetryContext| {
379 Self::retry_failure_decision(
380 failure,
381 context,
382 &retry_policy_options,
383 &retry_delay_options,
384 )
385 },
386 )
387 .build()
388 {
389 Ok(retry_policy) => retry_policy,
390 Err(error) => {
391 return Err(HttpError::other(format!(
392 "Invalid HTTP retry executor: {error}"
393 )))
394 }
395 };
396
397 let cancellation_token = request.cancellation_token().cloned();
398 let request_method = request.method().clone();
399 let request_url = request.resolved_url_with_query().ok();
400 let retry_request = request.clone();
401 let retry_future = retry_policy.run_async(|| {
402 let attempt_request = retry_request.clone();
403 async move { self.execute_once(attempt_request).await }
404 });
405
406 let retry_result = if let Some(token) = cancellation_token.as_ref() {
407 tokio::select! {
408 _ = token.cancelled() => {
409 return Err(Self::retry_cancelled_error(
410 "HTTP retry cancelled while waiting before next attempt",
411 &request_method,
412 request_url.as_ref(),
413 ));
414 }
415 result = retry_future => result,
416 }
417 } else {
418 retry_future.await
419 };
420
421 match retry_result {
422 Ok(response) => Ok(response),
423 Err(error) => Err(Self::map_retry_error(
424 error,
425 started_at,
426 options.max_duration,
427 options.max_attempts,
428 )),
429 }
430 }
431
432 /// Returns whether `error` is retryable under `options`.
433 ///
434 /// # Parameters
435 /// - `error`: Error produced by a single HTTP attempt.
436 /// - `options`: Effective retry options for the request.
437 ///
438 /// # Returns
439 /// `true` if another attempt may be scheduled.
440 fn is_retryable_error(error: &HttpError, options: &HttpRetryOptions) -> bool {
441 if error.kind == crate::HttpErrorKind::Status {
442 error
443 .status
444 .is_some_and(|status| options.is_retryable_status(status))
445 } else {
446 options.is_retryable_error_kind(error.kind)
447 }
448 }
449
450 /// Computes the sleep before the next retry attempt.
451 ///
452 /// # Parameters
453 /// - `base_delay`: Delay selected from retry policy and jitter.
454 /// - `retry_after_hint`: Retry-After delay extracted by the retry policy.
455 ///
456 /// # Returns
457 /// `base_delay`, or the larger `Retry-After` value when present.
458 fn retry_sleep_delay(base_delay: Duration, retry_after_hint: Option<Duration>) -> Duration {
459 retry_after_hint
460 .map(|retry_after| retry_after.max(base_delay))
461 .unwrap_or(base_delay)
462 }
463
464 /// Decides how HTTP retry should handle one failed attempt.
465 ///
466 /// # Parameters
467 /// - `failure`: Failed attempt reported by `qubit-retry`.
468 /// - `context`: Retry context for the failed attempt.
469 /// - `policy_options`: HTTP retry allowlists and method policy.
470 /// - `delay_options`: Retry executor options used to calculate base delay.
471 ///
472 /// # Returns
473 /// Decision for `qubit-retry`: abort non-retryable HTTP errors, otherwise
474 /// retry after the larger base delay / Retry-After hint. Non-HTTP runtime
475 /// failures fall back to the retry executor default.
476 fn retry_failure_decision(
477 failure: &AttemptFailure<HttpError>,
478 context: &RetryContext,
479 policy_options: &HttpRetryOptions,
480 delay_options: &qubit_retry::RetryOptions,
481 ) -> AttemptFailureDecision {
482 let Some(error) = failure.as_error() else {
483 return AttemptFailureDecision::UseDefault;
484 };
485 if !Self::is_retryable_error(error, policy_options) {
486 return AttemptFailureDecision::Abort;
487 }
488
489 let base_delay = delay_options.delay_for_attempt(context.attempt());
490 let sleep_delay = Self::retry_sleep_delay(base_delay, context.retry_after_hint());
491 AttemptFailureDecision::RetryAfter(sleep_delay)
492 }
493
494 /// Exposes retry failure decisions to coverage-only integration tests.
495 ///
496 /// # Parameters
497 /// - `failure`: Failed attempt to inspect.
498 /// - `context`: Retry context for the failed attempt.
499 /// - `policy_options`: HTTP retry policy options.
500 /// - `delay_options`: Retry executor options.
501 ///
502 /// # Returns
503 /// The retry decision selected by [`Self::retry_failure_decision`].
504 #[cfg(coverage)]
505 #[doc(hidden)]
506 pub(crate) fn coverage_retry_failure_decision(
507 failure: &AttemptFailure<HttpError>,
508 context: &RetryContext,
509 policy_options: &HttpRetryOptions,
510 delay_options: &qubit_retry::RetryOptions,
511 ) -> AttemptFailureDecision {
512 Self::retry_failure_decision(failure, context, policy_options, delay_options)
513 }
514
515 /// Adds retry-attempt exhaustion context to the last attempt error.
516 ///
517 /// # Parameters
518 /// - `error`: Last retryable attempt error.
519 /// - `attempts`: Number of attempts already made.
520 /// - `max_attempts`: Configured maximum attempts.
521 ///
522 /// # Returns
523 /// The same error with retry exhaustion details appended to its message.
524 fn map_retry_attempts_exhausted(
525 mut error: HttpError,
526 attempts: u32,
527 max_attempts: u32,
528 ) -> HttpError {
529 error.message = format!(
530 "{} (retry attempts exhausted: {attempts}/{max_attempts})",
531 error.message
532 );
533 error
534 }
535
536 /// Builds the error returned when retry policy stops early.
537 ///
538 /// # Parameters
539 /// - `error`: Attempt error that the retry policy chose not to retry.
540 /// - `attempts`: Number of attempts already made.
541 /// - `started_at`: Start instant of the retry flow.
542 ///
543 /// # Returns
544 /// [`HttpError::retry_aborted`] with the original [`HttpError`] chained as
545 /// source for callers that need the underlying status or transport error.
546 fn map_retry_aborted(error: HttpError, attempts: u32, started_at: Instant) -> HttpError {
547 let elapsed = started_at.elapsed();
548 let summary = error.message.clone();
549 HttpError::retry_aborted(format!(
550 "HTTP retry aborted after {attempts} attempt(s) in {elapsed:?}: {summary}"
551 ))
552 .with_source(error)
553 }
554
555 /// Builds the error when retry max-duration is exhausted.
556 ///
557 /// # Parameters
558 /// - `started_at`: Start instant of the retry flow.
559 /// - `max_duration`: Configured max-duration budget.
560 /// - `last_error`: Last captured retryable attempt error, if any.
561 ///
562 /// # Returns
563 /// Augments the last failure when present, otherwise a dedicated
564 /// max-duration error with no underlying attempt error.
565 fn map_retry_max_duration_exceeded(
566 started_at: Instant,
567 max_duration: Option<Duration>,
568 last_error: Option<HttpError>,
569 ) -> HttpError {
570 let elapsed = started_at.elapsed();
571 let max_duration_text = max_duration
572 .map(|duration| format!("{duration:?}"))
573 .unwrap_or_else(|| "unbounded".to_string());
574 match last_error {
575 Some(mut error) => {
576 error.message = format!(
577 "{} (retry max duration exceeded: {elapsed:?}/{max_duration_text})",
578 error.message
579 );
580 error
581 }
582 None => HttpError::retry_max_elapsed_exceeded(format!(
583 "HTTP retry max duration exceeded before a retryable error was captured: {elapsed:?}/{max_duration_text}"
584 )),
585 }
586 }
587
588 /// Maps a [`qubit_retry::RetryError`] into this crate's HTTP error model.
589 ///
590 /// # Parameters
591 /// - `error`: Terminal retry error from `qubit-retry`.
592 /// - `started_at`: Monotonic start instant of the HTTP retry flow.
593 /// - `max_duration`: Optional HTTP total retry budget.
594 /// - `max_attempts`: Configured maximum HTTP attempts.
595 ///
596 /// # Returns
597 /// A rich [`HttpError`] preserving the last attempt error when available.
598 fn map_retry_error(
599 error: RetryError<HttpError>,
600 started_at: Instant,
601 max_duration: Option<Duration>,
602 max_attempts: u32,
603 ) -> HttpError {
604 let attempts = error.attempts();
605 let reason = error.reason();
606 let (_, last_failure, _) = error.into_parts();
607 let last_error = last_failure.and_then(AttemptFailure::into_error);
608
609 match reason {
610 RetryErrorReason::AttemptsExceeded => last_error
611 .map(|error| Self::map_retry_attempts_exhausted(error, attempts, max_attempts))
612 .unwrap_or_else(|| {
613 HttpError::retry_aborted(format!(
614 "HTTP retry attempts exhausted without a captured HTTP error: {attempts}/{max_attempts}"
615 ))
616 }),
617 RetryErrorReason::MaxOperationElapsedExceeded
618 | RetryErrorReason::MaxTotalElapsedExceeded => {
619 Self::map_retry_max_duration_exceeded(started_at, max_duration, last_error)
620 }
621 RetryErrorReason::Aborted => match last_error {
622 Some(error) if error.kind == crate::HttpErrorKind::Cancelled => error,
623 Some(error) => Self::map_retry_aborted(error, attempts, started_at),
624 None => HttpError::retry_aborted(format!(
625 "HTTP retry aborted after {attempts} attempt(s) without a captured HTTP error"
626 )),
627 },
628 RetryErrorReason::UnsupportedOperation | RetryErrorReason::WorkerStillRunning => {
629 HttpError::other(format!(
630 "HTTP retry executor failed after {attempts} attempt(s): {reason:?}"
631 ))
632 }
633 }
634 }
635
636 /// Exposes retry error mapping to coverage-only integration tests.
637 ///
638 /// # Parameters
639 /// - `error`: Synthetic terminal retry error.
640 /// - `started_at`: Monotonic retry-flow start instant.
641 /// - `max_duration`: Optional HTTP max-duration budget.
642 /// - `max_attempts`: Configured maximum attempts.
643 ///
644 /// # Returns
645 /// HTTP error mapped by [`Self::map_retry_error`].
646 #[cfg(coverage)]
647 #[doc(hidden)]
648 pub(crate) fn coverage_map_retry_error(
649 error: RetryError<HttpError>,
650 started_at: Instant,
651 max_duration: Option<Duration>,
652 max_attempts: u32,
653 ) -> HttpError {
654 Self::map_retry_error(error, started_at, max_duration, max_attempts)
655 }
656
657 /// Exercises the low-level pre-send cancellation check for coverage tests.
658 ///
659 /// # Returns
660 /// Error kind returned before any network I/O starts.
661 #[cfg(coverage)]
662 #[doc(hidden)]
663 pub(crate) async fn coverage_prepare_cancelled_error() -> crate::HttpErrorKind {
664 let client = crate::HttpClientFactory::new()
665 .create_default()
666 .expect("coverage HTTP client should build");
667 let token = tokio_util::sync::CancellationToken::new();
668 token.cancel();
669 let mut request = client
670 .request(
671 http::Method::GET,
672 "https://example.com/cancelled-before-send",
673 )
674 .build();
675 request.set_cancellation_token(token);
676
677 client
678 .prepare_and_send_once(request, "coverage request cancelled before send")
679 .await
680 .expect_err("pre-cancelled request should fail before send")
681 .kind
682 }
683
684 /// Builds a cancellation error for retry wait cancellation.
685 ///
686 /// # Parameters
687 /// - `message`: Human-readable cancellation reason.
688 /// - `method`: Request method to attach.
689 /// - `url`: Optional resolved request URL to attach.
690 ///
691 /// # Returns
692 /// [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled) with request
693 /// context.
694 fn retry_cancelled_error(
695 message: &str,
696 method: &http::Method,
697 url: Option<&url::Url>,
698 ) -> HttpError {
699 let mut error = HttpError::cancelled(message).with_method(method);
700 if let Some(url) = url {
701 error = error.with_url(url);
702 }
703 error
704 }
705
706 /// Opens an SSE stream and reconnects automatically on retryable stream
707 /// failures.
708 ///
709 /// Reconnect behavior:
710 /// - retryable transport/read failures trigger reconnects;
711 /// - optional reconnect on clean EOF (`reconnect_on_eof`);
712 /// - `Last-Event-ID` is set from the latest parsed SSE `id:` field;
713 /// - optional use of SSE `retry:` as next reconnect delay.
714 ///
715 /// # Parameters
716 /// - `request`: SSE request template reused on reconnect.
717 /// - `options`: Reconnect limits and delay policy.
718 ///
719 /// # Returns
720 /// SSE event stream yielding events from one or more reconnect sessions.
721 ///
722 /// # Errors
723 /// Stream items are `Result`; `Err` covers per-item failures such as:
724 /// - initial stream-open failures when not reconnectable or retries exhausted;
725 /// - SSE protocol errors (non-reconnectable by default);
726 /// - transport/read errors after reconnect budget is exhausted.
727 ///
728 /// # Side effects
729 /// Performs repeated HTTP requests and reads on reconnect; may sleep between
730 /// attempts according to reconnect options.
731 pub fn execute_sse_with_reconnect(
732 &self,
733 request: HttpRequest,
734 options: SseReconnectOptions,
735 ) -> SseEventStream {
736 SseReconnectRunner::new(self.clone(), request, options).run()
737 }
738}
739
740impl std::fmt::Debug for HttpClient {
741 /// Formats the client for debugging (exposes options and injectors; omits
742 /// the backend client).
743 ///
744 /// # Parameters
745 /// - `f`: Destination formatter.
746 ///
747 /// # Returns
748 /// `fmt::Result` from writing the debug struct.
749 ///
750 /// # Errors
751 /// Returns an error if formatting to `f` fails.
752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 f.debug_struct("HttpClient")
754 .field("options", &self.options)
755 .field("injectors", &self.injectors)
756 .field("async_injectors", &self.async_injectors)
757 .field("request_interceptors", &self.request_interceptors)
758 .field("response_interceptors", &self.response_interceptors)
759 .finish_non_exhaustive()
760 }
761}