qubit_http/client/http_client.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! HTTP client: builds requests, applies defaults and interceptors, executes
10//! them with optional retry, and exposes SSE helpers with reconnect.
11//!
12//! Single-shot execution is [`HttpClient::execute`] / [`HttpClient::execute_once`];
13//! retry policy comes from [`crate::HttpClientOptions::retry`] unless overridden
14//! per request.
15//!
16//! # Author
17//!
18//! Haixing Hu
19
20use std::time::{Duration, Instant};
21
22use qubit_retry::{
23 AttemptFailure, AttemptFailureDecision, Retry, RetryContext, RetryError, RetryErrorReason,
24};
25
26use crate::sse::SseReconnectRunner;
27use crate::{
28 response::HttpResponseOptions,
29 sse::{SseEventStream, SseReconnectOptions},
30 AsyncHttpHeaderInjector, HttpClientOptions, HttpError, HttpHeaderInjector, HttpLogger,
31 HttpRequest, HttpRequestBuilder, HttpRequestInterceptor, HttpRequestInterceptors, HttpResponse,
32 HttpResponseInterceptor, HttpResponseInterceptors, HttpResponseMeta, HttpResult,
33 HttpRetryOptions,
34};
35
36/// High-level HTTP client: default headers, injectors, interceptors, logging,
37/// timeouts, and optional per-request retry.
38///
39/// [`Clone`] is shallow and cheap enough for typical use (including passing into
40/// retry closures); cloning does not duplicate the underlying connection pool
41/// beyond what [`reqwest::Client`] already shares.
42#[derive(Clone)]
43pub struct HttpClient {
44 /// Pluggable low-level HTTP stack used to send requests (currently reqwest).
45 pub(super) backend: reqwest::Client,
46 /// Timeouts, proxy, logging, default headers, and related settings.
47 pub(super) options: HttpClientOptions,
48 /// Header injectors applied to every outgoing request after default
49 /// headers.
50 pub(super) injectors: Vec<HttpHeaderInjector>,
51 /// Async header injectors applied after sync injectors and before request-level headers.
52 pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
53 /// Request interceptors applied before request send for each attempt.
54 request_interceptors: HttpRequestInterceptors,
55 /// Response interceptors applied on successful responses before return.
56 response_interceptors: HttpResponseInterceptors,
57}
58
59impl HttpClient {
60 /// Wraps a built [`reqwest::Client`] with the given options and an empty
61 /// injector list.
62 ///
63 /// # Parameters
64 /// - `backend`: Configured low-level HTTP client used for I/O.
65 /// - `options`: Client-wide timeouts, headers, proxy, logging, etc.
66 ///
67 /// # Returns
68 /// A new [`HttpClient`] with no injectors until
69 /// [`HttpClient::add_header_injector`] is called.
70 pub(crate) fn new(backend: reqwest::Client, options: HttpClientOptions) -> Self {
71 Self {
72 backend,
73 options,
74 injectors: Vec::new(),
75 async_injectors: Vec::new(),
76 request_interceptors: HttpRequestInterceptors::new(),
77 response_interceptors: HttpResponseInterceptors::new(),
78 }
79 }
80
81 /// Returns a reference to the client-wide options (timeouts, proxy, logging,
82 /// default headers, retry defaults, etc.).
83 ///
84 /// # Returns
85 /// Immutable borrow of [`HttpClientOptions`]. Never `None`; always the
86 /// options installed on this client.
87 pub fn options(&self) -> &HttpClientOptions {
88 &self.options
89 }
90
91 /// Appends a [`HttpHeaderInjector`] so its mutation function runs on every
92 /// request. Mutates `self` in place.
93 ///
94 /// # Parameters
95 /// - `injector`: Injector to append (order is preserved).
96 pub fn add_header_injector(&mut self, injector: HttpHeaderInjector) {
97 self.injectors.push(injector);
98 }
99
100 /// Appends an async header injector whose mutation runs after sync injectors.
101 /// Mutates `self` in place.
102 ///
103 /// # Parameters
104 /// - `injector`: Async injector to append (order is preserved).
105 pub fn add_async_header_injector(&mut self, injector: AsyncHttpHeaderInjector) {
106 self.async_injectors.push(injector);
107 }
108
109 /// Appends a request interceptor run before each send attempt (including
110 /// each retry attempt). Mutates `self` in place.
111 ///
112 /// # Parameters
113 /// - `interceptor`: Request interceptor to append (order is preserved).
114 pub fn add_request_interceptor(&mut self, interceptor: HttpRequestInterceptor) {
115 self.request_interceptors.push(interceptor);
116 }
117
118 /// Appends a response interceptor run only after a successful HTTP status
119 /// (after the internal `execute_once` step) and before response body logging.
120 /// Mutates `self` in place.
121 ///
122 /// # Parameters
123 /// - `interceptor`: Response interceptor to append (order is preserved).
124 pub fn add_response_interceptor(&mut self, interceptor: HttpResponseInterceptor) {
125 self.response_interceptors.push(interceptor);
126 }
127
128 /// Validates and adds one client-level default header.
129 ///
130 /// The header is applied to every request before header injectors and
131 /// request-level headers.
132 ///
133 /// # Parameters
134 /// - `name`: Header name.
135 /// - `value`: Header value.
136 ///
137 /// # Returns
138 /// `Ok(self)` after the header is stored.
139 ///
140 /// # Errors
141 /// Returns [`HttpError`] when the header name or value is invalid.
142 pub fn add_header(&mut self, name: &str, value: &str) -> HttpResult<&mut Self> {
143 self.options.add_header(name, value)?;
144 Ok(self)
145 }
146
147 /// Validates and adds many client-level default headers atomically.
148 ///
149 /// If any input pair is invalid, no header from this batch is applied.
150 ///
151 /// # Parameters
152 /// - `headers`: Iterator of `(name, value)` pairs.
153 ///
154 /// # Returns
155 /// `Ok(self)` after all headers are stored.
156 ///
157 /// # Errors
158 /// Returns [`HttpError`] when any name/value pair is invalid (nothing from
159 /// this call is applied).
160 pub fn add_headers(&mut self, headers: &[(&str, &str)]) -> HttpResult<&mut Self> {
161 self.options.add_headers(headers)?;
162 Ok(self)
163 }
164
165 /// Clears all synchronous header injectors. Mutates `self` in place.
166 pub fn clear_header_injectors(&mut self) {
167 self.injectors.clear();
168 }
169
170 /// Clears all async header injectors. Mutates `self` in place.
171 pub fn clear_async_header_injectors(&mut self) {
172 self.async_injectors.clear();
173 }
174
175 /// Clears all request interceptors. Mutates `self` in place.
176 pub fn clear_request_interceptors(&mut self) {
177 self.request_interceptors.clear();
178 }
179
180 /// Clears all response interceptors. Mutates `self` in place.
181 pub fn clear_response_interceptors(&mut self) {
182 self.response_interceptors.clear();
183 }
184
185 /// Starts building an [`HttpRequest`] with the given method and path
186 /// (relative or absolute URL string).
187 ///
188 /// # Parameters
189 /// - `method`: HTTP verb (GET, POST, …).
190 /// - `path`: Path relative to [`HttpClientOptions::base_url`] or a full URL
191 /// string.
192 ///
193 /// # Returns
194 /// A new [`HttpRequestBuilder`] borrowing this client for defaults; it is
195 /// not sent until built and passed to [`HttpClient::execute`] (or related
196 /// APIs).
197 pub fn request(&self, method: http::Method, path: &str) -> HttpRequestBuilder {
198 HttpRequestBuilder::new(method, path, self)
199 }
200
201 /// Returns a clone of the client-level default header map.
202 ///
203 /// Used when constructing a built [`HttpRequest`] so the snapshot reflects
204 /// headers at build time.
205 ///
206 /// # Returns
207 /// Owned [`http::HeaderMap`] copy of [`HttpClientOptions`] default headers.
208 pub(crate) fn headers_snapshot(&self) -> http::HeaderMap {
209 self.options.default_headers.clone()
210 }
211
212 /// Returns a clone of the registered synchronous header injectors list.
213 ///
214 /// # Returns
215 /// New [`Vec`] with the same injectors and order as on this client.
216 pub(crate) fn injectors_snapshot(&self) -> Vec<HttpHeaderInjector> {
217 self.injectors.clone()
218 }
219
220 /// Returns a clone of the registered async header injectors list.
221 ///
222 /// # Returns
223 /// New [`Vec`] with the same injectors and order as on this client.
224 pub(crate) fn async_injectors_snapshot(&self) -> Vec<AsyncHttpHeaderInjector> {
225 self.async_injectors.clone()
226 }
227
228 /// Sends the request and returns a unified [`HttpResponse`].
229 ///
230 /// Chooses retry vs single attempt from resolved [`HttpRetryOptions`] for
231 /// this request. Performs network I/O and may await the internal
232 /// `execute_once` path
233 /// multiple times with backoff between attempts when retry is enabled.
234 ///
235 /// # Parameters
236 /// - `request`: Built request (URL resolved against `base_url` if path is
237 /// not absolute).
238 ///
239 /// # Returns
240 /// - `Ok(HttpResponse)` when the HTTP status is success
241 /// ([`http::StatusCode::is_success`]).
242 /// - `Err(HttpError)` when any attempt fails for URL/header validation,
243 /// cancellation, interceptor failure, transport/timeout, non-success
244 /// status, or when the retry executor aborts or exceeds limits.
245 pub async fn execute(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
246 let retry_options = self.options.retry.resolve(&request);
247 if retry_options.should_retry(&request) {
248 self.execute_with_retry(request, retry_options).await
249 } else {
250 self.execute_once(request).await
251 }
252 }
253
254 /// Performs one non-retrying execution: pre-send cancellation check,
255 /// request interceptors, resolve URL, merge headers, log the request, send
256 /// with configured timeouts, map non-success status to an error, then
257 /// response interceptors and response logging. The returned body is read
258 /// lazily according to [`HttpResponse`].
259 ///
260 /// # Parameters
261 /// - `request`: Built request to send (same fields as for
262 /// [`HttpClient::execute`]).
263 ///
264 /// # Returns
265 /// - `Ok(HttpResponse)` on success status and after interceptors/logging
266 /// steps succeed.
267 /// - `Err(HttpError)` from request/response interceptors, cancellation,
268 /// send/transport errors, status mapping, URL resolution for the response
269 /// wrapper, or response logging failures.
270 ///
271 /// # Side effects
272 /// Network I/O, optional logging, and user-provided interceptor callbacks.
273 pub(crate) async fn execute_once(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
274 let mut request = request;
275 if let Some(error) = request.cancelled_error_if_needed("Request cancelled before sending") {
276 return Err(error);
277 }
278 self.request_interceptors.apply(&mut request)?;
279 let response = self
280 .prepare_and_send_once(request, "Request cancelled before sending")
281 .await?;
282 let mut response = response
283 .into_success_or_status_error("HTTP request failed")
284 .await?;
285 self.response_interceptors.apply(&mut response.meta)?;
286 let logger = HttpLogger::new(&self.options);
287 logger.log_response(&mut response).await?;
288 Ok(response)
289 }
290
291 /// Single low-level send: cancellation check, request logging, one backend
292 /// round-trip, then wraps the backend response as [`HttpResponse`].
293 ///
294 /// Does not run response interceptors or success-status enforcement; those
295 /// happen in [`HttpClient::execute_once`] after this returns.
296 ///
297 /// # Parameters
298 /// - `request`: Request to send (may be mutated for logging/send path).
299 /// - `cancellation_message`: Message embedded if the request is already
300 /// cancelled when this runs.
301 ///
302 /// # Returns
303 /// - `Ok(HttpResponse)` with lazy body and metadata.
304 /// - `Err(HttpError)` if cancelled before send, send fails, or
305 /// [`HttpRequest::resolved_url`] fails when building the wrapper.
306 ///
307 /// # Side effects
308 /// Async network I/O and request logging via [`HttpLogger`].
309 async fn prepare_and_send_once(
310 &self,
311 request: HttpRequest,
312 cancellation_message: &str,
313 ) -> HttpResult<HttpResponse> {
314 let mut request = request;
315 if let Some(error) = request.cancelled_error_if_needed(cancellation_message) {
316 return Err(error);
317 }
318 let logger = HttpLogger::new(&self.options);
319 let backend_response = request.send_impl(&self.backend, &logger).await?;
320 let meta = HttpResponseMeta::new(
321 backend_response.status(),
322 backend_response.headers().clone(),
323 backend_response.url().clone(),
324 request.method().clone(),
325 );
326 let response_options = HttpResponseOptions::new(
327 self.options.error_response_preview_limit,
328 self.options.sse_json_mode,
329 self.options.sse_max_line_bytes,
330 self.options.sse_max_frame_bytes,
331 self.options.sse_done_marker_policy.clone(),
332 );
333 Ok(HttpResponse::from_backend(
334 meta,
335 backend_response,
336 request.read_timeout(),
337 request.cancellation_token().cloned(),
338 request.resolved_url_with_query()?,
339 response_options,
340 ))
341 }
342
343 /// Runs [`HttpClient::execute_once`] under the given retry policy.
344 ///
345 /// Between attempts waits according to the resolved retry delay, optionally
346 /// honoring `Retry-After` by extending the next sleep. Each attempt clones
347 /// the request so request bodies can be rebuilt when supported.
348 ///
349 /// # Parameters
350 /// - `request`: Built request passed to each [`HttpClient::execute_once`]
351 /// attempt (cloned per retry closure).
352 /// - `options`: Effective retry options for this request (from resolution
353 /// in [`HttpClient::execute`]).
354 ///
355 /// # Returns
356 /// - `Ok(HttpResponse)` when an attempt completes with success status.
357 /// - `Err(HttpError)` from retry option validation, from any
358 /// [`HttpClient::execute_once`] failure that is non-retryable, or from
359 /// retry exhaustion/max-duration enforcement.
360 ///
361 /// # Side effects
362 /// Multiple async HTTP attempts and optional sleeps.
363 async fn execute_with_retry(
364 &self,
365 request: HttpRequest,
366 options: HttpRetryOptions,
367 ) -> HttpResult<HttpResponse> {
368 let honor_retry_after = request.retry_override().should_honor_retry_after();
369 let retry_options = options.to_executor_options()?;
370 let started_at = Instant::now();
371
372 let retry_policy_options = options.clone();
373 let retry_delay_options = retry_options.clone();
374 let retry_policy = match Retry::<HttpError>::builder()
375 .options(retry_options)
376 .retry_after_from_error(move |error| {
377 honor_retry_after.then_some(error.retry_after).flatten()
378 })
379 .on_failure(
380 move |failure: &AttemptFailure<HttpError>, context: &RetryContext| {
381 Self::retry_failure_decision(
382 failure,
383 context,
384 &retry_policy_options,
385 &retry_delay_options,
386 )
387 },
388 )
389 .build()
390 {
391 Ok(retry_policy) => retry_policy,
392 Err(error) => {
393 return Err(HttpError::other(format!(
394 "Invalid HTTP retry executor: {error}"
395 )))
396 }
397 };
398
399 let cancellation_token = request.cancellation_token().cloned();
400 let request_method = request.method().clone();
401 let request_url = request.resolved_url_with_query().ok();
402 let retry_request = request.clone();
403 let retry_future = retry_policy.run_async(|| {
404 let attempt_request = retry_request.clone();
405 async move { self.execute_once(attempt_request).await }
406 });
407
408 let retry_result = if let Some(token) = cancellation_token.as_ref() {
409 tokio::select! {
410 _ = token.cancelled() => {
411 return Err(Self::retry_cancelled_error(
412 "HTTP retry cancelled while waiting before next attempt",
413 &request_method,
414 request_url.as_ref(),
415 ));
416 }
417 result = retry_future => result,
418 }
419 } else {
420 retry_future.await
421 };
422
423 match retry_result {
424 Ok(response) => Ok(response),
425 Err(error) => Err(Self::map_retry_error(
426 error,
427 started_at,
428 options.max_duration,
429 options.max_attempts,
430 )),
431 }
432 }
433
434 /// Returns whether `error` is retryable under `options`.
435 ///
436 /// # Parameters
437 /// - `error`: Error produced by a single HTTP attempt.
438 /// - `options`: Effective retry options for the request.
439 ///
440 /// # Returns
441 /// `true` if another attempt may be scheduled.
442 fn is_retryable_error(error: &HttpError, options: &HttpRetryOptions) -> bool {
443 if error.kind == crate::HttpErrorKind::Status {
444 error
445 .status
446 .is_some_and(|status| options.is_retryable_status(status))
447 } else {
448 options.is_retryable_error_kind(error.kind)
449 }
450 }
451
452 /// Computes the sleep before the next retry attempt.
453 ///
454 /// # Parameters
455 /// - `base_delay`: Delay selected from retry policy and jitter.
456 /// - `retry_after_hint`: Retry-After delay extracted by the retry policy.
457 ///
458 /// # Returns
459 /// `base_delay`, or the larger `Retry-After` value when present.
460 fn retry_sleep_delay(base_delay: Duration, retry_after_hint: Option<Duration>) -> Duration {
461 retry_after_hint
462 .map(|retry_after| retry_after.max(base_delay))
463 .unwrap_or(base_delay)
464 }
465
466 /// Decides how HTTP retry should handle one failed attempt.
467 ///
468 /// # Parameters
469 /// - `failure`: Failed attempt reported by `qubit-retry`.
470 /// - `context`: Retry context for the failed attempt.
471 /// - `policy_options`: HTTP retry allowlists and method policy.
472 /// - `delay_options`: Retry executor options used to calculate base delay.
473 ///
474 /// # Returns
475 /// Decision for `qubit-retry`: abort non-retryable HTTP errors, otherwise
476 /// retry after the larger base delay / Retry-After hint. Non-HTTP runtime
477 /// failures fall back to the retry executor default.
478 fn retry_failure_decision(
479 failure: &AttemptFailure<HttpError>,
480 context: &RetryContext,
481 policy_options: &HttpRetryOptions,
482 delay_options: &qubit_retry::RetryOptions,
483 ) -> AttemptFailureDecision {
484 let Some(error) = failure.as_error() else {
485 return AttemptFailureDecision::UseDefault;
486 };
487 if !Self::is_retryable_error(error, policy_options) {
488 return AttemptFailureDecision::Abort;
489 }
490
491 let base_delay = delay_options.delay_for_attempt(context.attempt());
492 let sleep_delay = Self::retry_sleep_delay(base_delay, context.retry_after_hint());
493 AttemptFailureDecision::RetryAfter(sleep_delay)
494 }
495
496 /// Exposes retry failure decisions to coverage-only integration tests.
497 ///
498 /// # Parameters
499 /// - `failure`: Failed attempt to inspect.
500 /// - `context`: Retry context for the failed attempt.
501 /// - `policy_options`: HTTP retry policy options.
502 /// - `delay_options`: Retry executor options.
503 ///
504 /// # Returns
505 /// The retry decision selected by [`Self::retry_failure_decision`].
506 #[cfg(coverage)]
507 #[doc(hidden)]
508 pub(crate) fn coverage_retry_failure_decision(
509 failure: &AttemptFailure<HttpError>,
510 context: &RetryContext,
511 policy_options: &HttpRetryOptions,
512 delay_options: &qubit_retry::RetryOptions,
513 ) -> AttemptFailureDecision {
514 Self::retry_failure_decision(failure, context, policy_options, delay_options)
515 }
516
517 /// Adds retry-attempt exhaustion context to the last attempt error.
518 ///
519 /// # Parameters
520 /// - `error`: Last retryable attempt error.
521 /// - `attempts`: Number of attempts already made.
522 /// - `max_attempts`: Configured maximum attempts.
523 ///
524 /// # Returns
525 /// The same error with retry exhaustion details appended to its message.
526 fn map_retry_attempts_exhausted(
527 mut error: HttpError,
528 attempts: u32,
529 max_attempts: u32,
530 ) -> HttpError {
531 error.message = format!(
532 "{} (retry attempts exhausted: {attempts}/{max_attempts})",
533 error.message
534 );
535 error
536 }
537
538 /// Builds the error returned when retry policy stops early.
539 ///
540 /// # Parameters
541 /// - `error`: Attempt error that the retry policy chose not to retry.
542 /// - `attempts`: Number of attempts already made.
543 /// - `started_at`: Start instant of the retry flow.
544 ///
545 /// # Returns
546 /// [`HttpError::retry_aborted`] with the original [`HttpError`] chained as
547 /// source for callers that need the underlying status or transport error.
548 fn map_retry_aborted(error: HttpError, attempts: u32, started_at: Instant) -> HttpError {
549 let elapsed = started_at.elapsed();
550 let summary = error.message.clone();
551 HttpError::retry_aborted(format!(
552 "HTTP retry aborted after {attempts} attempt(s) in {elapsed:?}: {summary}"
553 ))
554 .with_source(error)
555 }
556
557 /// Builds the error when retry max-duration is exhausted.
558 ///
559 /// # Parameters
560 /// - `started_at`: Start instant of the retry flow.
561 /// - `max_duration`: Configured max-duration budget.
562 /// - `last_error`: Last captured retryable attempt error, if any.
563 ///
564 /// # Returns
565 /// Augments the last failure when present, otherwise a dedicated
566 /// max-duration error with no underlying attempt error.
567 fn map_retry_max_duration_exceeded(
568 started_at: Instant,
569 max_duration: Option<Duration>,
570 last_error: Option<HttpError>,
571 ) -> HttpError {
572 let elapsed = started_at.elapsed();
573 let max_duration_text = max_duration
574 .map(|duration| format!("{duration:?}"))
575 .unwrap_or_else(|| "unbounded".to_string());
576 match last_error {
577 Some(mut error) => {
578 error.message = format!(
579 "{} (retry max duration exceeded: {elapsed:?}/{max_duration_text})",
580 error.message
581 );
582 error
583 }
584 None => HttpError::retry_max_elapsed_exceeded(format!(
585 "HTTP retry max duration exceeded before a retryable error was captured: {elapsed:?}/{max_duration_text}"
586 )),
587 }
588 }
589
590 /// Maps a [`qubit_retry::RetryError`] into this crate's HTTP error model.
591 ///
592 /// # Parameters
593 /// - `error`: Terminal retry error from `qubit-retry`.
594 /// - `started_at`: Monotonic start instant of the HTTP retry flow.
595 /// - `max_duration`: Optional HTTP total retry budget.
596 /// - `max_attempts`: Configured maximum HTTP attempts.
597 ///
598 /// # Returns
599 /// A rich [`HttpError`] preserving the last attempt error when available.
600 fn map_retry_error(
601 error: RetryError<HttpError>,
602 started_at: Instant,
603 max_duration: Option<Duration>,
604 max_attempts: u32,
605 ) -> HttpError {
606 let attempts = error.attempts();
607 let reason = error.reason();
608 let (_, last_failure, _) = error.into_parts();
609 let last_error = last_failure.and_then(AttemptFailure::into_error);
610
611 match reason {
612 RetryErrorReason::AttemptsExceeded => last_error
613 .map(|error| Self::map_retry_attempts_exhausted(error, attempts, max_attempts))
614 .unwrap_or_else(|| {
615 HttpError::retry_aborted(format!(
616 "HTTP retry attempts exhausted without a captured HTTP error: {attempts}/{max_attempts}"
617 ))
618 }),
619 RetryErrorReason::MaxOperationElapsedExceeded
620 | RetryErrorReason::MaxTotalElapsedExceeded => {
621 Self::map_retry_max_duration_exceeded(started_at, max_duration, last_error)
622 }
623 RetryErrorReason::Aborted => match last_error {
624 Some(error) if error.kind == crate::HttpErrorKind::Cancelled => error,
625 Some(error) => Self::map_retry_aborted(error, attempts, started_at),
626 None => HttpError::retry_aborted(format!(
627 "HTTP retry aborted after {attempts} attempt(s) without a captured HTTP error"
628 )),
629 },
630 RetryErrorReason::UnsupportedOperation | RetryErrorReason::WorkerStillRunning => {
631 HttpError::other(format!(
632 "HTTP retry executor failed after {attempts} attempt(s): {reason:?}"
633 ))
634 }
635 }
636 }
637
638 /// Exposes retry error mapping to coverage-only integration tests.
639 ///
640 /// # Parameters
641 /// - `error`: Synthetic terminal retry error.
642 /// - `started_at`: Monotonic retry-flow start instant.
643 /// - `max_duration`: Optional HTTP max-duration budget.
644 /// - `max_attempts`: Configured maximum attempts.
645 ///
646 /// # Returns
647 /// HTTP error mapped by [`Self::map_retry_error`].
648 #[cfg(coverage)]
649 #[doc(hidden)]
650 pub(crate) fn coverage_map_retry_error(
651 error: RetryError<HttpError>,
652 started_at: Instant,
653 max_duration: Option<Duration>,
654 max_attempts: u32,
655 ) -> HttpError {
656 Self::map_retry_error(error, started_at, max_duration, max_attempts)
657 }
658
659 /// Exercises the low-level pre-send cancellation check for coverage tests.
660 ///
661 /// # Returns
662 /// Error kind returned before any network I/O starts.
663 #[cfg(coverage)]
664 #[doc(hidden)]
665 pub(crate) async fn coverage_prepare_cancelled_error() -> crate::HttpErrorKind {
666 let client = crate::HttpClientFactory::new()
667 .create_default()
668 .expect("coverage HTTP client should build");
669 let token = tokio_util::sync::CancellationToken::new();
670 token.cancel();
671 let mut request = client
672 .request(
673 http::Method::GET,
674 "https://example.com/cancelled-before-send",
675 )
676 .build();
677 request.set_cancellation_token(token);
678
679 client
680 .prepare_and_send_once(request, "coverage request cancelled before send")
681 .await
682 .expect_err("pre-cancelled request should fail before send")
683 .kind
684 }
685
686 /// Builds a cancellation error for retry wait cancellation.
687 ///
688 /// # Parameters
689 /// - `message`: Human-readable cancellation reason.
690 /// - `method`: Request method to attach.
691 /// - `url`: Optional resolved request URL to attach.
692 ///
693 /// # Returns
694 /// [`HttpErrorKind::Cancelled`](crate::HttpErrorKind::Cancelled) with request
695 /// context.
696 fn retry_cancelled_error(
697 message: &str,
698 method: &http::Method,
699 url: Option<&url::Url>,
700 ) -> HttpError {
701 let mut error = HttpError::cancelled(message).with_method(method);
702 if let Some(url) = url {
703 error = error.with_url(url);
704 }
705 error
706 }
707
708 /// Opens an SSE stream and reconnects automatically on retryable stream
709 /// failures.
710 ///
711 /// Reconnect behavior:
712 /// - retryable transport/read failures trigger reconnects;
713 /// - optional reconnect on clean EOF (`reconnect_on_eof`);
714 /// - `Last-Event-ID` is set from the latest parsed SSE `id:` field;
715 /// - optional use of SSE `retry:` as next reconnect delay.
716 ///
717 /// # Parameters
718 /// - `request`: SSE request template reused on reconnect.
719 /// - `options`: Reconnect limits and delay policy.
720 ///
721 /// # Returns
722 /// SSE event stream yielding events from one or more reconnect sessions.
723 ///
724 /// # Errors
725 /// Stream items are `Result`; `Err` covers per-item failures such as:
726 /// - initial stream-open failures when not reconnectable or retries exhausted;
727 /// - SSE protocol errors (non-reconnectable by default);
728 /// - transport/read errors after reconnect budget is exhausted.
729 ///
730 /// # Side effects
731 /// Performs repeated HTTP requests and reads on reconnect; may sleep between
732 /// attempts according to reconnect options.
733 pub fn execute_sse_with_reconnect(
734 &self,
735 request: HttpRequest,
736 options: SseReconnectOptions,
737 ) -> SseEventStream {
738 SseReconnectRunner::new(self.clone(), request, options).run()
739 }
740}
741
742impl std::fmt::Debug for HttpClient {
743 /// Formats the client for debugging (exposes options and injectors; omits
744 /// the backend client).
745 ///
746 /// # Parameters
747 /// - `f`: Destination formatter.
748 ///
749 /// # Returns
750 /// `fmt::Result` from writing the debug struct.
751 ///
752 /// # Errors
753 /// Returns an error if formatting to `f` fails.
754 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
755 f.debug_struct("HttpClient")
756 .field("options", &self.options)
757 .field("injectors", &self.injectors)
758 .field("async_injectors", &self.async_injectors)
759 .field("request_interceptors", &self.request_interceptors)
760 .field("response_interceptors", &self.response_interceptors)
761 .finish_non_exhaustive()
762 }
763}