Skip to main content

qubit_http/
http_client.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! # HTTP Client
10//!
11//! Implements request execution and stream execution with unified behavior.
12//!
13//! # Author
14//!
15//! Haixing Hu
16
17use std::time::Duration;
18
19use async_stream::stream;
20use bytes::Bytes;
21use futures_util::StreamExt;
22use http::header::RETRY_AFTER;
23use http::{HeaderMap, StatusCode};
24use qubit_function::MutatingFunction;
25use qubit_retry::{
26    AttemptFailure, Jitter, RetryDecision, RetryError, RetryExecutor, RetryOptions, RetryResult,
27};
28use reqwest::Response;
29use tokio_util::sync::CancellationToken;
30use url::Host;
31use url::Url;
32
33use crate::{
34    AsyncHeaderInjector, HeaderInjector, HttpClientOptions, HttpError, HttpErrorKind, HttpLogger,
35    HttpRequest, HttpRequestBody, HttpRequestBuilder, HttpResponse, HttpResult, HttpRetryOptions,
36    HttpStreamResponse, RetryHint,
37};
38
39/// High-level HTTP client that applies options, header injection, logging, and
40/// timeouts.
41#[derive(Clone)]
42pub struct HttpClient {
43    /// Low-level HTTP client used to send requests.
44    client: reqwest::Client,
45    /// Timeouts, proxy, logging, default headers, and related settings.
46    options: HttpClientOptions,
47    /// Header injectors applied to every outgoing request after default
48    /// headers.
49    injectors: Vec<HeaderInjector>,
50    /// Async header injectors applied after sync injectors and before request-level headers.
51    async_injectors: Vec<AsyncHeaderInjector>,
52}
53
54impl std::fmt::Debug for HttpClient {
55    /// Formats the client for debugging (exposes options and injectors; omits
56    /// the reqwest client).
57    ///
58    /// # Parameters
59    /// - `f`: Destination formatter.
60    ///
61    /// # Returns
62    /// `fmt::Result` from writing the debug struct.
63    ///
64    /// # Errors
65    /// Returns an error if formatting to `f` fails.
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("HttpClient")
68            .field("options", &self.options)
69            .field("injectors", &self.injectors)
70            .field("async_injectors", &self.async_injectors)
71            .finish_non_exhaustive()
72    }
73}
74
75impl HttpClient {
76    /// Wraps a built [`reqwest::Client`] with the given options and an empty
77    /// injector list.
78    ///
79    /// # Parameters
80    /// - `client`: Configured reqwest client used for I/O.
81    /// - `options`: Client-wide timeouts, headers, proxy, logging, etc.
82    ///
83    /// # Returns
84    /// A new [`HttpClient`] with no injectors until
85    /// [`HttpClient::add_header_injector`] is called.
86    pub(crate) fn new(client: reqwest::Client, options: HttpClientOptions) -> Self {
87        Self {
88            client,
89            options,
90            injectors: Vec::new(),
91            async_injectors: Vec::new(),
92        }
93    }
94
95    /// Returns a reference to the client options (timeouts, proxy, logging,
96    /// etc.).
97    ///
98    /// # Returns
99    /// Immutable borrow of [`HttpClientOptions`].
100    pub fn options(&self) -> &HttpClientOptions {
101        &self.options
102    }
103
104    /// Appends a [`HeaderInjector`] so its mutation function runs on every
105    /// request.
106    ///
107    /// # Parameters
108    /// - `injector`: Injector to append (order is preserved).
109    ///
110    /// # Returns
111    /// Nothing.
112    pub fn add_header_injector(&mut self, injector: HeaderInjector) {
113        self.injectors.push(injector);
114    }
115
116    /// Appends an async header injector whose mutation runs after sync injectors.
117    ///
118    /// # Parameters
119    /// - `injector`: Async injector to append (order is preserved).
120    ///
121    /// # Returns
122    /// Nothing.
123    pub fn add_async_header_injector(&mut self, injector: AsyncHeaderInjector) {
124        self.async_injectors.push(injector);
125    }
126
127    /// Validates and adds one client-level default header.
128    ///
129    /// The header is applied to every request before header injectors and
130    /// request-level headers.
131    ///
132    /// # Parameters
133    /// - `name`: Header name.
134    /// - `value`: Header value.
135    ///
136    /// # Returns
137    /// `Ok(self)` after the header is stored.
138    ///
139    /// # Errors
140    /// Returns [`HttpError`] when the header name or value is invalid.
141    pub fn add_header(&mut self, name: &str, value: &str) -> HttpResult<&mut Self> {
142        self.options.add_header(name, value)?;
143        Ok(self)
144    }
145
146    /// Validates and adds many client-level default headers atomically.
147    ///
148    /// If any input pair is invalid, no header from this batch is applied.
149    ///
150    /// # Parameters
151    /// - `headers`: Iterator of `(name, value)` pairs.
152    ///
153    /// # Returns
154    /// `Ok(self)` after all headers are stored.
155    ///
156    /// # Errors
157    /// Returns [`HttpError`] when any name/value pair is invalid (nothing from
158    /// this call is applied).
159    pub fn add_headers<'a, I>(&mut self, headers: I) -> HttpResult<&mut Self>
160    where
161        I: IntoIterator<Item = (&'a str, &'a str)>,
162    {
163        self.options.add_headers(headers)?;
164        Ok(self)
165    }
166
167    /// Removes all registered header injectors.
168    ///
169    /// # Returns
170    /// Nothing.
171    pub fn clear_header_injectors(&mut self) {
172        self.injectors.clear();
173    }
174
175    /// Removes all registered async header injectors.
176    ///
177    /// # Returns
178    /// Nothing.
179    pub fn clear_async_header_injectors(&mut self) {
180        self.async_injectors.clear();
181    }
182
183    /// Starts building an [`HttpRequest`] with the given method and path
184    /// (relative or absolute URL string).
185    ///
186    /// # Parameters
187    /// - `method`: HTTP verb (GET, POST, …).
188    /// - `path`: Path relative to [`HttpClientOptions::base_url`] or a full URL
189    ///   string.
190    ///
191    /// # Returns
192    /// A fresh [`HttpRequestBuilder`] not yet tied to this client until
193    /// [`HttpRequestBuilder::build`] and [`HttpClient::execute`].
194    pub fn request(&self, method: http::Method, path: &str) -> HttpRequestBuilder {
195        HttpRequestBuilder::new(method, path)
196    }
197
198    /// Sends the request, reads the full response body, logs per options, and
199    /// returns a buffered [`HttpResponse`].
200    ///
201    /// # Parameters
202    /// - `request`: Built request (URL resolved against `base_url` if path is
203    ///   not absolute).
204    ///
205    /// # Returns
206    /// - `Ok(HttpResponse)` when the HTTP status is success
207    ///   ([`http::StatusCode::is_success`]).
208    /// - `Err(HttpError)` on URL/header errors, transport failure, timeout, or
209    ///   non-success status.
210    pub async fn execute(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
211        let retry_options = self.resolve_retry_options(&request);
212        let honor_retry_after = request.retry_override.should_honor_retry_after();
213        if !self.should_retry_request(&request, &retry_options) {
214            return self.execute_once(request).await;
215        }
216        self.execute_with_retry(request, retry_options, honor_retry_after)
217            .await
218    }
219
220    /// Performs one non-retrying execution: resolve URL, merge headers, log the
221    /// request, send with write timeout, reject non-success status, read the
222    /// full body with read timeout, then log the response.
223    ///
224    /// # Parameters
225    /// - `request`: Built request to send (same fields as for
226    ///   [`HttpClient::execute`]).
227    ///
228    /// # Returns
229    /// Buffered [`HttpResponse`] or [`HttpError`].
230    async fn execute_once(&self, request: HttpRequest) -> HttpResult<HttpResponse> {
231        let url = self.resolve_url(&request)?;
232        let method = request.method.clone();
233        let cancellation_token = request.cancellation_token.clone();
234        if let Some(error) = cancelled_request_error_if_needed(
235            cancellation_token.as_ref(),
236            &method,
237            &url,
238            "Request cancelled before sending",
239        ) {
240            return Err(error);
241        }
242        let headers = self.build_headers(&request).await?;
243
244        let body_for_log = clone_request_body_for_log(&request.body);
245
246        let logger = HttpLogger::new(&self.options.logging, &self.options.sensitive_headers);
247        logger.log_request(&method, &url, &headers, body_for_log.as_ref());
248
249        let mut builder = self.client.request(method.clone(), url.clone());
250        builder = builder.headers(headers);
251
252        if !request.query.is_empty() {
253            builder = builder.query(&request.query);
254        }
255
256        if let Some(timeout) = request.request_timeout {
257            builder = builder.timeout(timeout);
258        }
259
260        builder = apply_request_body(builder, request.body);
261
262        let response = self
263            .send_with_write_timeout(
264                builder,
265                method.clone(),
266                url.clone(),
267                cancellation_token.as_ref(),
268            )
269            .await?;
270
271        if !response.status().is_success() {
272            let status = response.status();
273            let retry_after = parse_retry_after(status, response.headers());
274            let error = response.error_for_status_ref().expect_err(
275                "non-success HTTP status must produce reqwest status error via error_for_status_ref",
276            );
277            let body_preview = self.read_error_response_preview(response).await;
278            let message = format!(
279                "HTTP request failed with status {} for {} {}; response body preview: {}",
280                status, method, url, body_preview
281            );
282            let mut mapped = map_reqwest_error(
283                error,
284                HttpErrorKind::Status,
285                Some(method.clone()),
286                Some(url.clone()),
287            )
288            .with_status(status)
289            .with_response_body_preview(body_preview);
290            if let Some(retry_after) = retry_after {
291                mapped = mapped.with_retry_after(retry_after);
292            }
293            mapped.message = message;
294            return Err(mapped);
295        }
296
297        let status = response.status();
298        let response_url = response.url().clone();
299        let response_headers = response.headers().clone();
300
301        let body = self
302            .read_body_with_timeout(
303                response,
304                method.clone(),
305                response_url.clone(),
306                cancellation_token.as_ref(),
307            )
308            .await?;
309
310        logger.log_response(status, &response_url, &response_headers, &body);
311
312        Ok(HttpResponse::new(
313            status,
314            response_headers,
315            body,
316            response_url,
317        ))
318    }
319
320    /// Sends the request and returns headers plus a byte stream without
321    /// buffering the full body.
322    ///
323    /// # Parameters
324    /// - `request`: Same as [`HttpClient::execute`].
325    ///
326    /// # Returns
327    /// - `Ok(HttpStreamResponse)` with a stream that applies read timeout per
328    ///   options.
329    /// - `Err(HttpError)` before the stream starts (same cases as
330    ///   [`HttpClient::execute`] for the initial response).
331    pub async fn execute_stream(&self, request: HttpRequest) -> HttpResult<HttpStreamResponse> {
332        let retry_options = self.resolve_retry_options(&request);
333        let honor_retry_after = request.retry_override.should_honor_retry_after();
334        if !self.should_retry_request(&request, &retry_options) {
335            return self.execute_stream_once(request).await;
336        }
337        self.execute_stream_with_retry(request, retry_options, honor_retry_after)
338            .await
339    }
340
341    /// Performs one non-retrying streaming execution: same setup as
342    /// [`HttpClient::execute_once`], but on success wraps the body in a stream
343    /// with per-chunk read timeouts instead of buffering the full body.
344    ///
345    /// # Parameters
346    /// - `request`: Built request to send (same fields as for
347    ///   [`HttpClient::execute_stream`]).
348    ///
349    /// # Returns
350    /// [`HttpStreamResponse`] or [`HttpError`].
351    async fn execute_stream_once(&self, request: HttpRequest) -> HttpResult<HttpStreamResponse> {
352        let url = self.resolve_url(&request)?;
353        let method = request.method.clone();
354        let cancellation_token = request.cancellation_token.clone();
355        if let Some(error) = cancelled_request_error_if_needed(
356            cancellation_token.as_ref(),
357            &method,
358            &url,
359            "Streaming request cancelled before sending",
360        ) {
361            return Err(error);
362        }
363        let headers = self.build_headers(&request).await?;
364
365        let body_for_log = clone_request_body_for_log(&request.body);
366
367        let logger = HttpLogger::new(&self.options.logging, &self.options.sensitive_headers);
368        logger.log_request(&method, &url, &headers, body_for_log.as_ref());
369
370        let mut builder = self.client.request(method.clone(), url.clone());
371        builder = builder.headers(headers);
372
373        if !request.query.is_empty() {
374            builder = builder.query(&request.query);
375        }
376
377        if let Some(timeout) = request.request_timeout {
378            builder = builder.timeout(timeout);
379        }
380
381        builder = apply_request_body(builder, request.body);
382
383        let response = self
384            .send_with_write_timeout(
385                builder,
386                method.clone(),
387                url.clone(),
388                cancellation_token.as_ref(),
389            )
390            .await?;
391
392        if !response.status().is_success() {
393            let status = response.status();
394            let retry_after = parse_retry_after(status, response.headers());
395            let error = response.error_for_status_ref().expect_err(
396                "non-success HTTP status must produce reqwest status error via error_for_status_ref",
397            );
398            let body_preview = self.read_error_response_preview(response).await;
399            let message = format!(
400                "HTTP streaming request failed with status {} for {} {}; response body preview: {}",
401                status, method, url, body_preview
402            );
403            let mut mapped = map_reqwest_error(
404                error,
405                HttpErrorKind::Status,
406                Some(method.clone()),
407                Some(url.clone()),
408            )
409            .with_status(status)
410            .with_response_body_preview(body_preview);
411            if let Some(retry_after) = retry_after {
412                mapped = mapped.with_retry_after(retry_after);
413            }
414            mapped.message = message;
415            return Err(mapped);
416        }
417
418        let status = response.status();
419        let response_url = response.url().clone();
420        let response_headers = response.headers().clone();
421
422        logger.log_stream_response_headers(status, &response_url, &response_headers);
423
424        let read_timeout = self.options.timeouts.read_timeout;
425        let method_for_err = method.clone();
426        let url_for_err = response_url.clone();
427        let cancellation_token_for_stream = cancellation_token.clone();
428
429        let mut stream = response.bytes_stream();
430        let wrapped = stream! {
431            loop {
432                let next = if let Some(token) = &cancellation_token_for_stream {
433                    tokio::select! {
434                        _ = token.cancelled() => {
435                            yield Err(HttpError::cancelled("Streaming response cancelled while reading body")
436                                .with_method(method_for_err.clone())
437                                .with_url(url_for_err.clone()));
438                            break;
439                        }
440                        item = tokio::time::timeout(read_timeout, stream.next()) => item,
441                    }
442                } else {
443                    tokio::time::timeout(read_timeout, stream.next()).await
444                };
445                match next {
446                    Ok(Some(Ok(bytes))) => yield Ok(bytes),
447                    Ok(Some(Err(error))) => {
448                        let mapped = map_reqwest_error(
449                            error,
450                            HttpErrorKind::Transport,
451                            Some(method_for_err.clone()),
452                            Some(url_for_err.clone()),
453                        );
454                        yield Err(mapped);
455                        break;
456                    }
457                    Ok(None) => break,
458                    Err(_) => {
459                        let error = HttpError::read_timeout(format!(
460                            "Read timeout after {:?} while streaming response",
461                            read_timeout
462                        ))
463                        .with_method(method_for_err.clone())
464                        .with_url(url_for_err.clone());
465                        yield Err(error);
466                        break;
467                    }
468                }
469            }
470        };
471
472        Ok(HttpStreamResponse::new_with_sse_options(
473            status,
474            response_headers,
475            response_url,
476            Box::pin(wrapped),
477            self.options.sse_json_mode,
478            self.options.sse_max_line_bytes,
479            self.options.sse_max_frame_bytes,
480        ))
481    }
482
483    /// Returns whether the client should run the retry policy for this request.
484    ///
485    /// Retries are enabled when `max_attempts` is greater than one and the
486    /// request method is allowed by [`HttpClientOptions`] retry settings.
487    ///
488    /// # Parameters
489    /// - `request`: Request whose HTTP method is checked against the configured
490    ///   retry policy.
491    /// - `retry_options`: Effective retry options after applying request-level overrides.
492    fn should_retry_request(
493        &self,
494        request: &HttpRequest,
495        retry_options: &HttpRetryOptions,
496    ) -> bool {
497        retry_options.max_attempts > 1 && retry_options.allows_method(&request.method)
498    }
499
500    /// Resolves request-level retry override against client-level retry options.
501    ///
502    /// # Parameters
503    /// - `request`: Request whose override is applied.
504    ///
505    /// # Returns
506    /// Effective retry options for this request.
507    fn resolve_retry_options(&self, request: &HttpRequest) -> HttpRetryOptions {
508        let mut options = self.options.retry.clone();
509        options.enabled = request.retry_override.resolve_enabled(options.enabled);
510        options.method_policy = request
511            .retry_override
512            .resolve_method_policy(options.method_policy);
513        options
514    }
515
516    /// Builds a [`RetryExecutor`] from effective retry options and classifies
517    /// [`HttpError`] values using [`RetryHint`].
518    ///
519    /// # Parameters
520    /// - `retry_options`: Effective retry options for this request.
521    /// - `honor_retry_after`: Whether to honor `Retry-After` on `429`.
522    ///
523    /// # Returns
524    /// Configured executor or [`HttpError`] if retry options or executor
525    /// configuration is invalid.
526    fn build_retry_executor(
527        &self,
528        retry_options: &HttpRetryOptions,
529        honor_retry_after: bool,
530    ) -> HttpResult<RetryExecutor<HttpError>> {
531        let options = RetryOptions::new(
532            retry_options.max_attempts,
533            retry_options.max_duration,
534            retry_options.delay_strategy.clone(),
535            Jitter::factor(retry_options.jitter_factor),
536        )
537        .map_err(|error| HttpError::other(format!("Invalid HTTP retry options: {error}")))?;
538
539        let mut builder = RetryExecutor::<HttpError>::builder()
540            .options(options)
541            .classify_error(|error: &HttpError, _| {
542                if matches!(error.retry_hint(), RetryHint::Retryable) {
543                    RetryDecision::Retry
544                } else {
545                    RetryDecision::Abort
546                }
547            });
548        if honor_retry_after {
549            builder = builder.on_retry(|context, failure| {
550                let AttemptFailure::Error(error) = failure else {
551                    return;
552                };
553                let Some(retry_after) = error.retry_after else {
554                    return;
555                };
556                if retry_after > context.next_delay {
557                    std::thread::sleep(retry_after - context.next_delay);
558                }
559            });
560        }
561        builder
562            .build()
563            .map_err(|error| HttpError::other(format!("Invalid HTTP retry executor: {error}")))
564    }
565
566    /// Runs [`HttpClient::execute_once`] under the configured retry policy.
567    ///
568    /// # Parameters
569    /// - `request`: Built request passed to each [`HttpClient::execute_once`]
570    ///   attempt.
571    /// - `retry_options`: Effective retry options for this request.
572    /// - `honor_retry_after`: Whether to honor `Retry-After` on `429`.
573    ///
574    /// # Returns
575    /// Same as a successful single attempt, or a mapped [`HttpError`] when
576    /// retries abort or limits are exceeded.
577    async fn execute_with_retry(
578        &self,
579        request: HttpRequest,
580        retry_options: HttpRetryOptions,
581        honor_retry_after: bool,
582    ) -> HttpResult<HttpResponse> {
583        let policy = self.build_retry_executor(&retry_options, honor_retry_after)?;
584        let client = self.clone();
585        let result = policy
586            .run_async(move || {
587                let client = client.clone();
588                let request = request.clone();
589                async move { client.execute_once(request).await }
590            })
591            .await;
592        map_retry_result(result)
593    }
594
595    /// Runs [`HttpClient::execute_stream_once`] under the configured retry
596    /// policy.
597    ///
598    /// # Parameters
599    /// - `request`: Built request passed to each
600    ///   [`HttpClient::execute_stream_once`] attempt.
601    /// - `retry_options`: Effective retry options for this request.
602    /// - `honor_retry_after`: Whether to honor `Retry-After` on `429`.
603    ///
604    /// # Returns
605    /// Same as a successful single streaming attempt, or a mapped [`HttpError`]
606    /// when retries abort or limits are exceeded.
607    async fn execute_stream_with_retry(
608        &self,
609        request: HttpRequest,
610        retry_options: HttpRetryOptions,
611        honor_retry_after: bool,
612    ) -> HttpResult<HttpStreamResponse> {
613        let policy = self.build_retry_executor(&retry_options, honor_retry_after)?;
614        let client = self.clone();
615        let result = policy
616            .run_async(move || {
617                let client = client.clone();
618                let request = request.clone();
619                async move { client.execute_stream_once(request).await }
620            })
621            .await;
622        map_retry_result(result)
623    }
624
625    /// Parses `request.path` as a URL or joins it to `base_url` when relative.
626    ///
627    /// # Parameters
628    /// - `request`: Request whose `path` and implied base are used.
629    ///
630    /// # Returns
631    /// Resolved [`Url`] or [`HttpError::invalid_url`] if resolution fails.
632    fn resolve_url(&self, request: &HttpRequest) -> HttpResult<Url> {
633        if let Ok(url) = Url::parse(&request.path) {
634            self.validate_resolved_url_host(&url)?;
635            return Ok(url);
636        }
637
638        let base = self.options.base_url.as_ref().ok_or_else(|| {
639            HttpError::invalid_url(format!(
640                "Cannot resolve relative path '{}' without base_url",
641                request.path
642            ))
643        })?;
644
645        let url = base.join(&request.path).map_err(|error| {
646            HttpError::invalid_url(format!(
647                "Failed to resolve path '{}' against base URL '{}': {}",
648                request.path, base, error
649            ))
650        })?;
651        self.validate_resolved_url_host(&url)?;
652        Ok(url)
653    }
654
655    /// Validates host constraints for a resolved URL under current client options.
656    ///
657    /// # Parameters
658    /// - `url`: Fully resolved request URL.
659    ///
660    /// # Returns
661    /// `Ok(())` when host is allowed by options.
662    ///
663    /// # Errors
664    /// Returns [`HttpError::invalid_url`] when `ipv4_only=true` and `url` uses an IPv6 literal host.
665    fn validate_resolved_url_host(&self, url: &Url) -> HttpResult<()> {
666        if self.options.ipv4_only && matches!(url.host(), Some(Host::Ipv6(_))) {
667            return Err(HttpError::invalid_url(format!(
668                "IPv6 literal host is not allowed when ipv4_only=true: {}",
669                url
670            )));
671        }
672        Ok(())
673    }
674
675    /// Merges default headers, injector output, and per-request headers (later
676    /// wins on duplicates).
677    ///
678    /// # Parameters
679    /// - `request`: Request supplying extra headers.
680    ///
681    /// # Returns
682    /// Final [`HeaderMap`] or error if an injector fails.
683    async fn build_headers(&self, request: &HttpRequest) -> HttpResult<HeaderMap> {
684        let mut headers = self.options.default_headers.clone();
685
686        for injector in &self.injectors {
687            injector.apply(&mut headers)?;
688        }
689        for injector in &self.async_injectors {
690            injector.apply(&mut headers).await?;
691        }
692
693        headers.extend(request.headers.clone());
694        Ok(headers)
695    }
696
697    /// Sends the built request with a write-phase timeout (time to finish
698    /// sending the request).
699    ///
700    /// # Parameters
701    /// - `builder`: Reqwest request builder (method, URL, headers, body already
702    ///   set).
703    /// - `method`: Method for error context.
704    /// - `url`: URL for error context.
705    ///
706    /// # Returns
707    /// Raw [`reqwest::Response`] or [`HttpError`] (transport, write timeout,
708    /// etc.).
709    async fn send_with_write_timeout(
710        &self,
711        builder: reqwest::RequestBuilder,
712        method: http::Method,
713        url: Url,
714        cancellation_token: Option<&CancellationToken>,
715    ) -> HttpResult<Response> {
716        let timeout = self.options.timeouts.write_timeout;
717        let send_future = tokio::time::timeout(timeout, builder.send());
718        let next = if let Some(token) = cancellation_token {
719            tokio::select! {
720                _ = token.cancelled() => {
721                    return Err(HttpError::cancelled("Request cancelled while sending")
722                        .with_method(method)
723                        .with_url(url));
724                }
725                send_result = send_future => send_result,
726            }
727        } else {
728            send_future.await
729        };
730        match next {
731            Ok(Ok(response)) => Ok(response),
732            Ok(Err(error)) => Err(map_reqwest_error(
733                error,
734                HttpErrorKind::Transport,
735                Some(method),
736                Some(url),
737            )),
738            Err(_) => Err(HttpError::write_timeout(format!(
739                "Write timeout after {:?} while sending request",
740                timeout
741            ))
742            .with_method(method)
743            .with_url(url)),
744        }
745    }
746
747    /// Reads the entire response body with a read timeout.
748    ///
749    /// # Parameters
750    /// - `response`: Successful response whose body will be consumed.
751    /// - `method`: Method for error context.
752    /// - `url`: URL for error context.
753    ///
754    /// # Returns
755    /// Body as [`Bytes`] or [`HttpError`] (decode/read timeout).
756    async fn read_body_with_timeout(
757        &self,
758        response: Response,
759        method: http::Method,
760        url: Url,
761        cancellation_token: Option<&CancellationToken>,
762    ) -> HttpResult<Bytes> {
763        let timeout = self.options.timeouts.read_timeout;
764        let read_future = tokio::time::timeout(timeout, response.bytes());
765        let next = if let Some(token) = cancellation_token {
766            tokio::select! {
767                _ = token.cancelled() => {
768                    return Err(HttpError::cancelled("Request cancelled while reading response body")
769                        .with_method(method)
770                        .with_url(url));
771                }
772                read_result = read_future => read_result,
773            }
774        } else {
775            read_future.await
776        };
777        match next {
778            Ok(Ok(body)) => Ok(body),
779            Ok(Err(error)) => Err(map_reqwest_error(
780                error,
781                HttpErrorKind::Decode,
782                Some(method),
783                Some(url),
784            )),
785            Err(_) => Err(HttpError::read_timeout(format!(
786                "Read timeout after {:?} while reading response body",
787                timeout
788            ))
789            .with_method(method)
790            .with_url(url)),
791        }
792    }
793
794    /// Reads and renders a bounded preview for a non-success response body.
795    ///
796    /// # Parameters
797    /// - `response`: Non-success response whose body will be consumed.
798    ///
799    /// # Returns
800    /// Rendered preview text. On preview read failure, returns a descriptive placeholder.
801    async fn read_error_response_preview(&self, mut response: Response) -> String {
802        let read_timeout = self.options.timeouts.read_timeout;
803        let max_bytes = self.options.logging.body_size_limit.max(1);
804        let mut preview = Vec::new();
805        let mut truncated = false;
806
807        loop {
808            let next = tokio::time::timeout(read_timeout, response.chunk()).await;
809            match next {
810                Ok(Ok(Some(chunk))) => {
811                    if preview.len() >= max_bytes {
812                        truncated = true;
813                        break;
814                    }
815                    let remaining = max_bytes - preview.len();
816                    if chunk.len() > remaining {
817                        preview.extend_from_slice(&chunk[..remaining]);
818                        truncated = true;
819                        break;
820                    }
821                    preview.extend_from_slice(&chunk);
822                }
823                Ok(Ok(None)) => break,
824                Ok(Err(error)) => {
825                    return format!(
826                        "<error body unavailable: failed to read response body: {}>",
827                        error
828                    );
829                }
830                Err(_) => {
831                    return format!(
832                        "<error body unavailable: read timeout after {:?}>",
833                        read_timeout
834                    );
835                }
836            }
837        }
838
839        render_error_body_preview(&preview, truncated)
840    }
841}
842
843/// Builds a cancelled error when `token` is already cancelled.
844///
845/// # Parameters
846/// - `token`: Optional cancellation token for this request.
847/// - `method`: Request method for error context.
848/// - `url`: Request URL for error context.
849/// - `message`: Cancellation message.
850///
851/// # Returns
852/// `Some(HttpError)` when cancelled, otherwise `None`.
853fn cancelled_request_error_if_needed(
854    token: Option<&CancellationToken>,
855    method: &http::Method,
856    url: &Url,
857    message: &str,
858) -> Option<HttpError> {
859    if token.is_some_and(CancellationToken::is_cancelled) {
860        Some(
861            HttpError::cancelled(message.to_string())
862                .with_method(method.clone())
863                .with_url(url.clone()),
864        )
865    } else {
866        None
867    }
868}
869
870/// Clones request body content for request logging.
871///
872/// # Parameters
873/// - `body`: Request body variant.
874///
875/// # Returns
876/// Optional byte payload for logger previewing.
877fn clone_request_body_for_log(body: &HttpRequestBody) -> Option<Bytes> {
878    match body {
879        HttpRequestBody::Bytes(bytes)
880        | HttpRequestBody::Json(bytes)
881        | HttpRequestBody::Form(bytes)
882        | HttpRequestBody::Multipart(bytes)
883        | HttpRequestBody::Ndjson(bytes) => Some(bytes.clone()),
884        HttpRequestBody::Text(text) => Some(Bytes::from(text.clone())),
885        HttpRequestBody::Empty => None,
886    }
887}
888
889/// Applies request body variant to a reqwest request builder.
890///
891/// # Parameters
892/// - `builder`: Request builder with method/url/headers/query already set.
893/// - `body`: Request body variant to apply.
894///
895/// # Returns
896/// Updated builder containing the request body payload.
897fn apply_request_body(
898    builder: reqwest::RequestBuilder,
899    body: HttpRequestBody,
900) -> reqwest::RequestBuilder {
901    match body {
902        HttpRequestBody::Empty => builder,
903        HttpRequestBody::Bytes(bytes)
904        | HttpRequestBody::Json(bytes)
905        | HttpRequestBody::Form(bytes)
906        | HttpRequestBody::Multipart(bytes)
907        | HttpRequestBody::Ndjson(bytes) => builder.body(bytes),
908        HttpRequestBody::Text(text) => builder.body(text),
909    }
910}
911
912/// Converts a [`RetryResult`] from the HTTP retry executor into [`HttpResult`].
913///
914/// Successful attempts pass through. Retry exhaustion and deadline failures are
915/// turned into [`HttpError`] values with additional context on the message when
916/// applicable.
917///
918/// # Parameters
919/// - `result`: Outcome of the retry executor after one or more async attempts.
920///
921/// # Returns
922/// The successful value, or an [`HttpError`] describing abort, exhaustion, or
923/// deadline overrun.
924fn map_retry_result<T>(result: RetryResult<T, HttpError>) -> HttpResult<T> {
925    match result {
926        Ok(value) => Ok(value),
927        Err(RetryError::Aborted { failure, .. }) => map_retry_failure(failure),
928        Err(RetryError::AttemptsExceeded {
929            attempts,
930            max_attempts,
931            last_failure,
932            ..
933        }) => {
934            let mut error = map_retry_failure_to_error(last_failure);
935            error.message = format!(
936                "{} (retry attempts exhausted: {attempts}/{max_attempts})",
937                error.message
938            );
939            Err(error)
940        }
941        Err(RetryError::MaxElapsedExceeded {
942            elapsed,
943            max_elapsed,
944            last_failure: Some(last_failure),
945            ..
946        }) => {
947            let mut error = map_retry_failure_to_error(last_failure);
948            error.message = format!(
949                "{} (retry max duration exceeded: {elapsed:?}/{max_elapsed:?})",
950                error.message
951            );
952            Err(error)
953        }
954        Err(RetryError::MaxElapsedExceeded {
955            elapsed,
956            max_elapsed,
957            last_failure: None,
958            ..
959        }) => Err(HttpError::other(format!(
960            "HTTP retry max duration exceeded before a retryable error was captured: {elapsed:?}/{max_elapsed:?}"
961        ))),
962    }
963}
964
965/// Maps a single retry [`AttemptFailure`] into [`HttpResult`].
966///
967/// # Parameters
968/// - `failure`: Single attempt outcome from the retry layer.
969///
970/// # Returns
971/// Always `Err`: either the wrapped [`HttpError`] or a synthesized timeout
972/// message.
973fn map_retry_failure<T>(failure: AttemptFailure<HttpError>) -> HttpResult<T> {
974    Err(map_retry_failure_to_error(failure))
975}
976
977/// Converts a retry-layer attempt failure into [`HttpError`].
978///
979/// # Parameters
980/// - `failure`: Attempt failure from the retry executor.
981///
982/// # Returns
983/// Mapped [`HttpError`] with timeout context when applicable.
984fn map_retry_failure_to_error(failure: AttemptFailure<HttpError>) -> HttpError {
985    match failure {
986        AttemptFailure::Error(error) => error,
987        AttemptFailure::AttemptTimeout { elapsed, timeout } => HttpError::other(format!(
988            "HTTP retry attempt timeout after {elapsed:?} (timeout: {timeout:?})"
989        )),
990    }
991}
992
993/// Maps a [`reqwest::Error`] into [`HttpError`] with best-effort
994/// [`HttpErrorKind`] and optional context.
995///
996/// # Parameters
997/// - `error`: Underlying reqwest error.
998/// - `default_kind`: Kind used when reqwest does not classify the error more
999///   specifically.
1000/// - `method`: Optional request method to attach.
1001/// - `url`: Optional request URL to attach.
1002///
1003/// # Returns
1004/// Configured [`HttpError`] including chained source.
1005fn map_reqwest_error(
1006    error: reqwest::Error,
1007    default_kind: HttpErrorKind,
1008    method: Option<http::Method>,
1009    url: Option<Url>,
1010) -> HttpError {
1011    let kind = if error.is_timeout() {
1012        classify_reqwest_timeout_kind(&error)
1013    } else if error.is_decode() {
1014        HttpErrorKind::Decode
1015    } else if error.is_status() {
1016        HttpErrorKind::Status
1017    } else if error.is_request() && error.url().is_none() {
1018        HttpErrorKind::InvalidUrl
1019    } else {
1020        default_kind
1021    };
1022
1023    let mut result = HttpError::new(kind, format!("HTTP transport error: {}", error));
1024    if let Some(method) = method {
1025        result = result.with_method(method);
1026    }
1027    if let Some(url) = url {
1028        result = result.with_url(url);
1029    }
1030    result.with_source(error)
1031}
1032
1033/// Classifies reqwest timeout errors into connect-timeout vs request-timeout.
1034///
1035/// # Parameters
1036/// - `error`: Reqwest timeout error to classify.
1037///
1038/// # Returns
1039/// [`HttpErrorKind::ConnectTimeout`] when the timeout message indicates a connect-phase timeout;
1040/// otherwise [`HttpErrorKind::RequestTimeout`].
1041fn classify_reqwest_timeout_kind(error: &reqwest::Error) -> HttpErrorKind {
1042    let message = error.to_string().to_ascii_lowercase();
1043    if message.contains("connect") {
1044        HttpErrorKind::ConnectTimeout
1045    } else {
1046        HttpErrorKind::RequestTimeout
1047    }
1048}
1049
1050/// Parses `Retry-After` from response headers for `429 Too Many Requests`.
1051///
1052/// # Parameters
1053/// - `status`: HTTP status code.
1054/// - `headers`: Response headers.
1055///
1056/// # Returns
1057/// Parsed retry delay when `status` is `429` and `Retry-After` is an integer
1058/// number of seconds.
1059fn parse_retry_after(status: StatusCode, headers: &HeaderMap) -> Option<Duration> {
1060    if status != StatusCode::TOO_MANY_REQUESTS {
1061        return None;
1062    }
1063    headers
1064        .get(RETRY_AFTER)
1065        .and_then(|value| value.to_str().ok())
1066        .and_then(parse_retry_after_value)
1067}
1068
1069/// Parses a `Retry-After` header value as integer seconds.
1070///
1071/// # Parameters
1072/// - `value`: Raw `Retry-After` header value.
1073///
1074/// # Returns
1075/// Parsed duration, or `None` when value is not a non-negative integer.
1076fn parse_retry_after_value(value: &str) -> Option<Duration> {
1077    let seconds = value.trim().parse::<u64>().ok()?;
1078    Some(Duration::from_secs(seconds))
1079}
1080
1081/// Renders a human-readable error-body preview from raw bytes.
1082///
1083/// # Parameters
1084/// - `bytes`: Captured body bytes (already size-limited).
1085/// - `truncated`: Whether additional bytes were omitted.
1086///
1087/// # Returns
1088/// UTF-8 text preview or binary placeholder with truncation suffix when needed.
1089fn render_error_body_preview(bytes: &[u8], truncated: bool) -> String {
1090    if bytes.is_empty() {
1091        return "<empty>".to_string();
1092    }
1093
1094    let suffix = if truncated { "...<truncated>" } else { "" };
1095    match std::str::from_utf8(bytes) {
1096        Ok(text) => format!("{text}{suffix}"),
1097        Err(_) => format!("<binary {} bytes>{suffix}", bytes.len()),
1098    }
1099}