zng_task/
http.rs

1#![cfg(feature = "http")]
2// suppress nag about very simple boxed closure signatures.
3#![expect(clippy::type_complexity)]
4
5//! HTTP client.
6//!
7//! This module is a thin wrapper around the [`isahc`] crate that just limits the API surface to only
8//! `async` methods without the async suffix. You can convert from/into that [`isahc`] types and this one.
9//!
10//! # Examples
11//!
12//! Get some text:
13//!
14//! ```
15//! # use zng_task as task;
16//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
17//! let text = task::http::get_txt("https://httpbin.org/base64/SGVsbG8gV29ybGQ=").await?;
18//! println!("{text}!");
19//! # Ok(()) }
20//! ```
21//!
22//! [`isahc`]: https://docs.rs/isahc
23
24mod cache;
25mod util;
26
27pub use cache::*;
28use zng_var::impl_from_and_into_var;
29
30use std::convert::TryFrom;
31use std::error::Error as StdError;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::time::Duration;
35use std::{fmt, mem};
36
37use crate::Progress;
38
39use super::io::AsyncRead;
40
41use isahc::config::Configurable;
42pub use isahc::config::RedirectPolicy;
43pub use isahc::cookies::{Cookie, CookieJar};
44pub use isahc::http::{Method, StatusCode, Uri, header, uri};
45
46use futures_lite::io::{AsyncReadExt, BufReader};
47use isahc::{AsyncReadResponseExt, ResponseExt};
48use parking_lot::{Mutex, const_mutex};
49
50use zng_txt::{Txt, formatx};
51use zng_unit::*;
52
53/// Marker trait for types that try-to-convert to [`Uri`].
54///
55/// All types `T` that match `Uri: TryFrom<T>, <Uri as TryFrom<T>>::Error: Into<isahc::http::Error>` implement this trait.
56#[diagnostic::on_unimplemented(note = "`TryUri` is implemented for all `T` where `Uri: TryFrom<T, Error: Into<isahc::http::Error>>`")]
57pub trait TryUri {
58    /// Tries to convert `self` into [`Uri`].
59    fn try_uri(self) -> Result<Uri, Error>;
60}
61impl<U> TryUri for U
62where
63    Uri: TryFrom<U>,
64    <Uri as TryFrom<U>>::Error: Into<isahc::http::Error>,
65{
66    fn try_uri(self) -> Result<Uri, Error> {
67        Uri::try_from(self).map_err(|e| e.into().into())
68    }
69}
70
71/// Marker trait for types that try-to-convert to [`Method`].
72///
73/// All types `T` that match `Method: TryFrom<T>, <Method as TryFrom<T>>::Error: Into<isahc::http::Error>` implement this trait.
74#[diagnostic::on_unimplemented(note = "`TryMethod` is implemented for all `T` where `Method: TryFrom<T, Error: Into<isahc::http::Error>>`")]
75pub trait TryMethod {
76    /// Tries to convert `self` into [`Method`].
77    fn try_method(self) -> Result<Method, Error>;
78}
79impl<U> TryMethod for U
80where
81    Method: TryFrom<U>,
82    <isahc::http::Method as TryFrom<U>>::Error: Into<isahc::http::Error>,
83{
84    fn try_method(self) -> Result<Method, Error> {
85        Method::try_from(self).map_err(|e| e.into().into())
86    }
87}
88
89/// Marker trait for types that try-to-convert to [`Body`].
90///
91/// All types `T` that match `isahc::AsyncBody: TryFrom<T>, <isahc::AsyncBody as TryFrom<T>>::Error: Into<isahc::http::Error>`
92/// implement this trait.
93#[diagnostic::on_unimplemented(note = "`TryBody` is implemented for all `T` where `Body: TryFrom<T, Error: Into<isahc::http::Error>>`")]
94pub trait TryBody {
95    /// Tries to convert `self` into [`Body`].
96    fn try_body(self) -> Result<Body, Error>;
97}
98impl<U> TryBody for U
99where
100    isahc::AsyncBody: TryFrom<U>,
101    <isahc::AsyncBody as TryFrom<U>>::Error: Into<isahc::http::Error>,
102{
103    fn try_body(self) -> Result<Body, Error> {
104        match isahc::AsyncBody::try_from(self) {
105            Ok(r) => Ok(Body(r)),
106            Err(e) => Err(e.into().into()),
107        }
108    }
109}
110
111/// Marker trait for types that try-to-convert to [`header::HeaderName`].
112///
113/// All types `T` that match `header::HeaderName: TryFrom<T>, <header::HeaderName as TryFrom<T>>::Error: Into<isahc::http::Error>`
114/// implement this trait.
115#[diagnostic::on_unimplemented(
116    note = "`TryHeaderName` is implemented for all `T` where `HeaderName: TryFrom<T, Error: Into<isahc::http::Error>>`"
117)]
118pub trait TryHeaderName {
119    /// Tries to convert `self` into [`Body`].
120    fn try_header_name(self) -> Result<header::HeaderName, Error>;
121}
122impl<U> TryHeaderName for U
123where
124    header::HeaderName: TryFrom<U>,
125    <header::HeaderName as TryFrom<U>>::Error: Into<isahc::http::Error>,
126{
127    fn try_header_name(self) -> Result<header::HeaderName, Error> {
128        header::HeaderName::try_from(self).map_err(|e| e.into().into())
129    }
130}
131
132/// Marker trait for types that try-to-convert to [`header::HeaderValue`].
133///
134/// All types `T` that match `header::HeaderValue: TryFrom<T>, <header::HeaderValue as TryFrom<T>>::Error: Into<isahc::http::Error>`
135/// implement this trait.
136#[diagnostic::on_unimplemented(
137    note = "`TryHeaderValue` is implemented for all `T` where `HeaderValue: TryFrom<T, Error: Into<isahc::http::Error>>`"
138)]
139pub trait TryHeaderValue {
140    /// Tries to convert `self` into [`Body`].
141    fn try_header_value(self) -> Result<header::HeaderValue, Error>;
142}
143impl<U> TryHeaderValue for U
144where
145    header::HeaderValue: TryFrom<U>,
146    <header::HeaderValue as TryFrom<U>>::Error: Into<isahc::http::Error>,
147{
148    fn try_header_value(self) -> Result<header::HeaderValue, Error> {
149        header::HeaderValue::try_from(self).map_err(|e| e.into().into())
150    }
151}
152
153/// HTTP request.
154///
155/// Use [`send`] to send a request.
156#[derive(Debug)]
157pub struct Request {
158    req: isahc::Request<Body>,
159    limits: ResponseLimits,
160}
161impl Request {
162    /// Starts an empty builder.
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// use zng_task::http;
168    ///
169    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
170    /// let request = http::Request::builder()
171    ///     .method(http::Method::PUT)?
172    ///     .uri("https://httpbin.org/put")?
173    ///     .build();
174    /// # Ok(()) }
175    /// ```
176    ///
177    /// Call [`build`] or [`body`] to finish building the request, note that there are is also an associated function
178    /// to start a builder for each HTTP method and uri.
179    ///
180    /// [`build`]: RequestBuilder::build
181    /// [`body`]: RequestBuilder::body
182    pub fn builder() -> RequestBuilder {
183        RequestBuilder::start(isahc::Request::builder())
184    }
185
186    /// Starts building a GET request.
187    ///
188    /// # Examples
189    ///
190    /// ```
191    /// use zng_task::http;
192    ///
193    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
194    /// let get = http::Request::get("https://httpbin.org/get")?.build();
195    /// # Ok(()) }
196    /// ```
197    pub fn get(uri: impl TryUri) -> Result<RequestBuilder, Error> {
198        Ok(RequestBuilder::start(isahc::Request::get(uri.try_uri()?)))
199    }
200
201    /// Starts building a PUT request.
202    ///
203    /// # Examples
204    ///
205    /// ```
206    /// use zng_task::http;
207    ///
208    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
209    /// let put = http::Request::put("https://httpbin.org/put")?
210    ///     .header("accept", "application/json")?
211    ///     .build();
212    /// # Ok(()) }
213    /// ```
214    pub fn put(uri: impl TryUri) -> Result<RequestBuilder, Error> {
215        Ok(RequestBuilder::start(isahc::Request::put(uri.try_uri()?)))
216    }
217
218    /// Starts building a POST request.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use zng_task::http;
224    ///
225    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
226    /// let post = http::Request::post("https://httpbin.org/post")?
227    ///     .header("accept", "application/json")?
228    ///     .build();
229    /// # Ok(()) }
230    /// ```
231    pub fn post(uri: impl TryUri) -> Result<RequestBuilder, Error> {
232        Ok(RequestBuilder::start(isahc::Request::post(uri.try_uri()?)))
233    }
234
235    /// Starts building a DELETE request.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use zng_task::http;
241    ///
242    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
243    /// let delete = http::Request::delete("https://httpbin.org/delete")?
244    ///     .header("accept", "application/json")?
245    ///     .build();
246    /// # Ok(()) }
247    /// ```
248    pub fn delete(uri: impl TryUri) -> Result<RequestBuilder, Error> {
249        Ok(RequestBuilder::start(isahc::Request::delete(uri.try_uri()?)))
250    }
251
252    /// Starts building a PATCH request.
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// use zng_task::http;
258    ///
259    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
260    /// let patch = http::Request::patch("https://httpbin.org/patch")?
261    ///     .header("accept", "application/json")?
262    ///     .build();
263    /// # Ok(()) }
264    /// ```
265    pub fn patch(uri: impl TryUri) -> Result<RequestBuilder, Error> {
266        Ok(RequestBuilder::start(isahc::Request::patch(uri.try_uri()?)))
267    }
268
269    /// Starts building a HEAD request.
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// use zng_task::http;
275    ///
276    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
277    /// let head = http::Request::head("https://httpbin.org")?.build();
278    /// # Ok(()) }
279    /// ```
280    pub fn head(uri: impl TryUri) -> Result<RequestBuilder, Error> {
281        Ok(RequestBuilder::start(isahc::Request::head(uri.try_uri()?)))
282    }
283
284    /// Returns a reference to the associated URI.
285    pub fn uri(&self) -> &Uri {
286        self.req.uri()
287    }
288
289    /// Returns a reference to the associated HTTP method.
290    pub fn method(&self) -> &Method {
291        self.req.method()
292    }
293
294    /// Returns a reference to the associated header field map.
295    pub fn headers(&self) -> &header::HeaderMap {
296        self.req.headers()
297    }
298
299    /// Create a clone of the request method, URI, version and headers, with a new `body`.
300    pub fn clone_with(&self, body: impl TryBody) -> Result<Self, Error> {
301        let body = body.try_body()?;
302
303        let mut req = isahc::Request::new(body);
304        *req.method_mut() = self.req.method().clone();
305        *req.uri_mut() = self.req.uri().clone();
306        *req.version_mut() = self.req.version();
307        let headers = req.headers_mut();
308        for (name, value) in self.headers() {
309            headers.insert(name.clone(), value.clone());
310        }
311
312        Ok(Self {
313            req,
314            limits: self.limits.clone(),
315        })
316    }
317}
318
319#[derive(Debug, Default, Clone)]
320struct ResponseLimits {
321    max_length: Option<ByteLength>,
322    require_length: bool,
323}
324impl ResponseLimits {
325    fn check(&self, response: isahc::Response<isahc::AsyncBody>) -> Result<isahc::Response<isahc::AsyncBody>, Error> {
326        if self.require_length || self.max_length.is_some() {
327            let response = Response(response);
328            if let Some(len) = response.content_len() {
329                if let Some(max) = self.max_length
330                    && max < len
331                {
332                    return Err(Error::MaxLength {
333                        content_length: Some(len),
334                        max_length: max,
335                    });
336                }
337            } else if self.require_length {
338                return Err(Error::RequireLength);
339            }
340
341            if let Some(max) = self.max_length {
342                let (parts, body) = response.0.into_parts();
343                let response = isahc::Response::from_parts(
344                    parts,
345                    isahc::AsyncBody::from_reader(super::io::ReadLimited::new(body, max, move || {
346                        std::io::Error::new(std::io::ErrorKind::InvalidData, MaxLengthError(None, max))
347                    })),
348                );
349
350                Ok(response)
351            } else {
352                Ok(response.0)
353            }
354        } else {
355            Ok(response)
356        }
357    }
358}
359
360/// A [`Request`] builder.
361///
362/// You can use [`Request::builder`] to start an empty builder.
363#[derive(Debug)]
364pub struct RequestBuilder {
365    builder: isahc::http::request::Builder,
366    limits: ResponseLimits,
367}
368impl Default for RequestBuilder {
369    fn default() -> Self {
370        Request::builder()
371    }
372}
373impl RequestBuilder {
374    /// New default request builder.
375    pub fn new() -> Self {
376        Request::builder()
377    }
378
379    fn start(builder: isahc::http::request::Builder) -> Self {
380        Self {
381            builder,
382            limits: ResponseLimits::default(),
383        }
384    }
385
386    /// Set the HTTP method for this request.
387    pub fn method(self, method: impl TryMethod) -> Result<Self, Error> {
388        Ok(Self {
389            builder: self.builder.method(method.try_method()?),
390            limits: self.limits,
391        })
392    }
393
394    /// Set the URI for this request.
395    pub fn uri(self, uri: impl TryUri) -> Result<Self, Error> {
396        Ok(Self {
397            builder: self.builder.uri(uri.try_uri()?),
398            limits: self.limits,
399        })
400    }
401
402    /// Appends a header to this request.
403    pub fn header(self, name: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
404        Ok(Self {
405            builder: self.builder.header(name.try_header_name()?, value.try_header_value()?),
406            limits: self.limits,
407        })
408    }
409
410    /// Set a cookie jar to use to accept, store, and supply cookies for incoming responses and outgoing requests.
411    ///
412    /// Note that the [`default_client`] already has a cookie jar.
413    pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
414        Self {
415            builder: self.builder.cookie_jar(cookie_jar),
416            limits: self.limits,
417        }
418    }
419
420    /// Specify a maximum amount of time that a complete request/response cycle is allowed to
421    /// take before being aborted. This includes DNS resolution, connecting to the server,
422    /// writing the request, and reading the response.
423    ///
424    /// Note that this includes the response read operation, so if you get a response but don't
425    /// read-it within this timeout you will get a [`TimedOut`] IO error.
426    ///
427    /// By default no timeout is used.
428    ///
429    /// [`TimedOut`]: https://doc.rust-lang.org/nightly/std/io/enum.ErrorKind.html#variant.TimedOut
430    pub fn timeout(self, timeout: Duration) -> Self {
431        Self {
432            builder: self.builder.timeout(timeout),
433            limits: self.limits,
434        }
435    }
436
437    /// Set a timeout for establishing connections to a host.
438    ///
439    /// If not set, the [`default_client`] default of 90 seconds will be used.
440    pub fn connect_timeout(self, timeout: Duration) -> Self {
441        Self {
442            builder: self.builder.connect_timeout(timeout),
443            limits: self.limits,
444        }
445    }
446
447    /// Specify a maximum amount of time where transfer rate can go below a minimum speed limit.
448    ///
449    /// The `low_speed` limit is in bytes/s. No low-speed limit is configured by default.
450    pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
451        Self {
452            builder: self.builder.low_speed_timeout(low_speed, timeout),
453            limits: self.limits,
454        }
455    }
456
457    /// Set a policy for automatically following server redirects.
458    ///
459    /// If enabled the "Referer" header will be set automatically too.
460    ///
461    /// The [`default_client`] follows up-to 20 redirects.
462    pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
463        if !matches!(policy, RedirectPolicy::None) {
464            Self {
465                builder: self.builder.redirect_policy(policy).auto_referer(),
466                limits: self.limits,
467            }
468        } else {
469            Self {
470                builder: self.builder.redirect_policy(policy),
471                limits: self.limits,
472            }
473        }
474    }
475
476    /// Enable or disable automatic decompression of the response body.
477    ///
478    /// If enabled the "Accept-Encoding" will also be set automatically, if it was not set using [`header`].
479    ///
480    /// This is enabled by default.
481    ///
482    /// [`header`]: Self::header
483    pub fn auto_decompress(self, enabled: bool) -> Self {
484        Self {
485            builder: self.builder.automatic_decompression(enabled),
486            limits: self.limits,
487        }
488    }
489
490    /// Set a maximum upload speed for the request body, in bytes per second.
491    pub fn max_upload_speed(self, max: u64) -> Self {
492        Self {
493            builder: self.builder.max_upload_speed(max),
494            limits: self.limits,
495        }
496    }
497
498    /// Set a maximum download speed for the response body, in bytes per second.
499    pub fn max_download_speed(self, max: u64) -> Self {
500        Self {
501            builder: self.builder.max_download_speed(max),
502            limits: self.limits,
503        }
504    }
505
506    /// Set the maximum response content length allowed.
507    ///
508    /// If the `Content-Length` is present on the response and it exceeds this limit an error is
509    /// returned immediately, otherwise if [`require_length`] is not enabled an error will be returned
510    /// only when the downloaded body length exceeds the limit.
511    ///
512    /// No limit by default.
513    ///
514    /// [`require_length`]: Self::require_length
515    pub fn max_length(mut self, max: ByteLength) -> Self {
516        self.limits.max_length = Some(max);
517        self
518    }
519
520    /// Set if the `Content-Length` header must be present in the response.
521    pub fn require_length(mut self, require: bool) -> Self {
522        self.limits.require_length = require;
523        self
524    }
525
526    /// Enable or disable metrics collecting.
527    ///
528    /// When enabled you can get the information using the [`Response::metrics`] method.
529    ///
530    /// This is enabled by default.
531    pub fn metrics(self, enable: bool) -> Self {
532        Self {
533            builder: self.builder.metrics(enable),
534            limits: self.limits,
535        }
536    }
537
538    /// Build the request without a body.
539    pub fn build(self) -> Request {
540        self.body(()).unwrap()
541    }
542
543    /// Build the request with a body.
544    pub fn body(self, body: impl TryBody) -> Result<Request, Error> {
545        Ok(Request {
546            req: self.builder.body(body.try_body()?).unwrap(),
547            limits: self.limits,
548        })
549    }
550
551    /// Build the request with more custom build calls in the [inner builder].
552    ///
553    /// [inner builder]: isahc::http::request::Builder
554    pub fn build_custom<F>(self, custom: F) -> Result<Request, Error>
555    where
556        F: FnOnce(isahc::http::request::Builder) -> isahc::http::Result<isahc::Request<isahc::AsyncBody>>,
557    {
558        let req = custom(self.builder)?;
559        Ok(Request {
560            req: req.map(Body),
561            limits: self.limits,
562        })
563    }
564}
565
566/// Head parts from a split [`Response`].
567pub type ResponseParts = isahc::http::response::Parts;
568
569/// HTTP response.
570#[derive(Debug)]
571pub struct Response(isahc::Response<isahc::AsyncBody>);
572impl Response {
573    /// Returns the [`StatusCode`].
574    pub fn status(&self) -> StatusCode {
575        self.0.status()
576    }
577
578    /// Returns a reference to the associated header field map.
579    pub fn headers(&self) -> &header::HeaderMap<header::HeaderValue> {
580        self.0.headers()
581    }
582
583    /// Decode content-length value if it is present in the headers.
584    pub fn content_len(&self) -> Option<ByteLength> {
585        self.0.body().len().map(|l| ByteLength(l as usize))
586    }
587
588    /// Get the configured cookie jar used for persisting cookies from this response, if any.
589    ///
590    /// Only returns `None` if the [`default_client`] was replaced by one with cookies disabled.
591    pub fn cookie_jar(&self) -> Option<&CookieJar> {
592        self.0.cookie_jar()
593    }
594
595    /// Read the response body as a string.
596    pub async fn text(&mut self) -> std::io::Result<Txt> {
597        self.0.text().await.map(Txt::from)
598    }
599
600    /// Get the effective URI of this response. This value differs from the
601    /// original URI provided when making the request if at least one redirect
602    /// was followed.
603    pub fn effective_uri(&self) -> Option<&Uri> {
604        self.0.effective_uri()
605    }
606
607    /// Read the response body as raw bytes.
608    pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
609        Body::bytes_impl(self.0.body_mut()).await
610    }
611
612    /// Read some bytes from the body, returns how many bytes where read.
613    pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
614        BufReader::new(self.0.body_mut()).read(buf).await
615    }
616
617    /// Read the from the body to exactly fill the buffer.
618    pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
619        BufReader::new(self.0.body_mut()).read_exact(buf).await
620    }
621
622    /// Deserialize the response body as JSON.
623    pub async fn json<O>(&mut self) -> Result<O, serde_json::Error>
624    where
625        O: serde::de::DeserializeOwned + std::marker::Unpin,
626    {
627        self.0.json().await
628    }
629
630    /// Metrics for the task transfer.
631    ///
632    /// Metrics are enabled in the [`default_client`] and can be toggled for each request using the
633    /// [`RequestBuilder::metrics`] method. If disabled returns [`Metrics::zero`].
634    pub fn metrics(&self) -> Metrics {
635        self.0.metrics().map(Metrics::from_isahc).unwrap_or_else(Metrics::zero)
636    }
637
638    /// Drop the request without dropping the connection.
639    ///
640    /// This receives and discards any remaining bytes in the response stream. When a response
641    /// is dropped without finishing the connection is discarded so it cannot be reused for connections
642    /// older then HTTP/2.
643    ///
644    /// You should call this method before dropping if you expect the remaining bytes to be consumed quickly and
645    /// don't known that HTTP/2 or newer is being used.
646    pub async fn consume(&mut self) -> std::io::Result<()> {
647        self.0.consume().await
648    }
649
650    /// Create a response with the given status and text body message.
651    pub fn new_message(status: impl Into<StatusCode>, msg: impl Into<String>) -> Self {
652        let status = status.into();
653        let msg = msg.into().into_bytes();
654        let msg = futures_lite::io::Cursor::new(msg);
655        let mut r = isahc::Response::new(isahc::AsyncBody::from_reader(msg));
656        *r.status_mut() = status;
657        Self(r)
658    }
659
660    /// New response.
661    pub fn new(status: StatusCode, headers: header::HeaderMap<header::HeaderValue>, body: Body) -> Self {
662        let mut r = isahc::Response::new(body.0);
663        *r.status_mut() = status;
664        *r.headers_mut() = headers;
665        Self(r)
666    }
667
668    /// Consumes the response returning the head and body parts.
669    pub fn into_parts(self) -> (ResponseParts, Body) {
670        let (p, b) = self.0.into_parts();
671        (p, Body(b))
672    }
673
674    /// New response from given head and body.
675    pub fn from_parts(parts: ResponseParts, body: Body) -> Self {
676        Self(isahc::Response::from_parts(parts, body.0))
677    }
678}
679impl From<Response> for isahc::Response<isahc::AsyncBody> {
680    fn from(r: Response) -> Self {
681        r.0
682    }
683}
684
685/// HTTP request body.
686///
687/// Use [`TryBody`] to convert types to body.
688#[derive(Debug, Default)]
689pub struct Body(isahc::AsyncBody);
690impl Body {
691    /// Create a new empty body.
692    ///
693    /// An empty body represents the *absence* of a body, which is semantically different than the presence of a body of zero length.
694    pub fn empty() -> Body {
695        Body(isahc::AsyncBody::empty())
696    }
697
698    /// Create a new body from a potentially static byte buffer.
699    ///
700    /// The body will have a known length equal to the number of bytes given.
701    ///
702    /// This will try to prevent a copy if the type passed in can be re-used, otherwise the buffer
703    /// will be copied first. This method guarantees to not require a copy for the following types:
704    pub fn from_bytes_static(bytes: impl AsRef<[u8]> + 'static) -> Self {
705        Body(isahc::AsyncBody::from_bytes_static(bytes))
706    }
707
708    /// Create a streaming body of unknown length.
709    pub fn from_reader(read: impl AsyncRead + Send + Sync + 'static) -> Self {
710        Body(isahc::AsyncBody::from_reader(read))
711    }
712
713    /// Create a streaming body of with known length.
714    pub fn from_reader_sized(read: impl AsyncRead + Send + Sync + 'static, size: u64) -> Self {
715        Body(isahc::AsyncBody::from_reader_sized(read, size))
716    }
717
718    /// Report if this body is empty.
719    ///
720    /// This is not necessarily the same as checking for zero length, since HTTP message bodies are optional,
721    /// there is a semantic difference between the absence of a body and the presence of a zero-length body.
722    /// This method will only return `true` for the former.
723    pub fn is_empty(&self) -> bool {
724        self.0.is_empty()
725    }
726
727    /// Get the size of the body, if known.
728    pub fn len(&self) -> Option<u64> {
729        self.0.len()
730    }
731
732    /// If this body is repeatable, reset the body stream back to the start of the content.
733    ///
734    /// Returns false if the body cannot be reset.
735    pub fn reset(&mut self) -> bool {
736        self.0.reset()
737    }
738
739    /// Read the body as raw bytes.
740    pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
741        Self::bytes_impl(&mut self.0).await
742    }
743    async fn bytes_impl(body: &mut isahc::AsyncBody) -> std::io::Result<Vec<u8>> {
744        let cap = body.len().unwrap_or(1024);
745        let mut bytes = Vec::with_capacity(cap as usize);
746        super::io::copy(body, &mut bytes).await?;
747        Ok(bytes)
748    }
749
750    /// Read the body and try to convert to UTF-8.
751    ///
752    /// Consider using [`Response::text`], it uses the header encoding information if available.
753    pub async fn text_utf8(&mut self) -> Result<Txt, Box<dyn std::error::Error>> {
754        let bytes = self.bytes().await?;
755        let r = String::from_utf8(bytes)?;
756        Ok(Txt::from(r))
757    }
758
759    /// Read some bytes from the body, returns how many bytes where read.
760    pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
761        BufReader::new(&mut self.0).read(buf).await
762    }
763
764    /// Read the from the body to exactly fill the buffer.
765    pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
766        BufReader::new(&mut self.0).read_exact(buf).await
767    }
768}
769impl From<Body> for isahc::AsyncBody {
770    fn from(r: Body) -> Self {
771        r.0
772    }
773}
774impl From<isahc::AsyncBody> for Body {
775    fn from(r: isahc::AsyncBody) -> Self {
776        Body(r)
777    }
778}
779impl From<()> for Body {
780    fn from(body: ()) -> Self {
781        Body(body.into())
782    }
783}
784impl From<String> for Body {
785    fn from(body: String) -> Self {
786        Body(body.into())
787    }
788}
789impl From<Txt> for Body {
790    fn from(body: Txt) -> Self {
791        Body(String::from(body).into())
792    }
793}
794impl From<Vec<u8>> for Body {
795    fn from(body: Vec<u8>) -> Self {
796        Body(body.into())
797    }
798}
799impl From<&'_ [u8]> for Body {
800    fn from(body: &[u8]) -> Self {
801        body.to_vec().into()
802    }
803}
804impl From<&'_ str> for Body {
805    fn from(body: &str) -> Self {
806        body.as_bytes().into()
807    }
808}
809impl<T: Into<Self>> From<Option<T>> for Body {
810    fn from(body: Option<T>) -> Self {
811        match body {
812            Some(body) => body.into(),
813            None => Self::empty(),
814        }
815    }
816}
817impl AsyncRead for Body {
818    fn poll_read(
819        self: std::pin::Pin<&mut Self>,
820        cx: &mut std::task::Context<'_>,
821        buf: &mut [u8],
822    ) -> std::task::Poll<std::io::Result<usize>> {
823        Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
824    }
825}
826
827/// Send a GET request to the `uri`.
828///
829/// The [`default_client`] is used to send the request.
830pub async fn get(uri: impl TryUri) -> Result<Response, Error> {
831    default_client().get(uri).await
832}
833
834/// Send a GET request to the `uri` and read the response as a string.
835///
836/// The [`default_client`] is used to send the request.
837pub async fn get_txt(uri: impl TryUri) -> Result<Txt, Error> {
838    default_client().get_txt(uri).await
839}
840
841/// Send a GET request to the `uri` and read the response as raw bytes.
842///
843/// The [`default_client`] is used to send the request.
844pub async fn get_bytes(uri: impl TryUri) -> Result<Vec<u8>, Error> {
845    default_client().get_bytes(uri).await
846}
847
848/// Send a GET request to the `uri` and de-serializes the response.
849///
850/// The [`default_client`] is used to send the request.
851pub async fn get_json<O>(uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
852where
853    O: serde::de::DeserializeOwned + std::marker::Unpin,
854{
855    default_client().get_json(uri).await
856}
857
858/// Send a HEAD request to the `uri`.
859///
860/// The [`default_client`] is used to send the request.
861pub async fn head(uri: impl TryUri) -> Result<Response, Error> {
862    default_client().head(uri).await
863}
864
865/// Send a PUT request to the `uri` with a given request body.
866///
867/// The [`default_client`] is used to send the request.
868pub async fn put(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
869    default_client().put(uri, body).await
870}
871
872/// Send a POST request to the `uri` with a given request body.
873///
874/// The [`default_client`] is used to send the request.
875pub async fn post(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
876    default_client().post(uri, body).await
877}
878
879/// Send a DELETE request to the `uri`.
880///
881/// The [`default_client`] is used to send the request.
882pub async fn delete(uri: impl TryUri) -> Result<Response, Error> {
883    default_client().delete(uri).await
884}
885
886/// Send a custom [`Request`].
887///
888/// The [`default_client`] is used to send the request.
889pub async fn send(request: Request) -> Result<Response, Error> {
890    default_client().send(request).await
891}
892
893/// The [`Client`] used by the functions in this module.
894///
895/// You can replace the default client at the start of the process using [`set_default_client_init`].
896///
897/// # Defaults
898///
899/// The default client is created using [`Client::new`].
900///
901/// [`isahc`]: https://docs.rs/isahc
902pub fn default_client() -> &'static Client {
903    use once_cell::sync::Lazy;
904
905    static SHARED: Lazy<Client> = Lazy::new(|| {
906        let ci = mem::replace(&mut *CLIENT_INIT.lock(), ClientInit::Inited);
907        if let ClientInit::Set(init) = ci {
908            init()
909        } else {
910            // browser defaults
911            Client::new()
912        }
913    });
914    &SHARED
915}
916
917static CLIENT_INIT: Mutex<ClientInit> = const_mutex(ClientInit::None);
918
919enum ClientInit {
920    None,
921    Set(Box<dyn FnOnce() -> Client + Send>),
922    Inited,
923}
924
925/// Set a custom initialization function for the [`default_client`].
926///
927/// The [`default_client`] is used by all functions in this module and is initialized on the first usage,
928/// you can use this function before any HTTP operation to replace the [`isahc`] client.
929///
930/// Returns an error if the [`default_client`] was already initialized.
931///
932/// [`isahc`]: https://docs.rs/isahc
933pub fn set_default_client_init<I>(init: I) -> Result<(), DefaultAlreadyInitedError>
934where
935    I: FnOnce() -> Client + Send + 'static,
936{
937    let mut ci = CLIENT_INIT.lock();
938    if let ClientInit::Inited = &*ci {
939        Err(DefaultAlreadyInitedError {})
940    } else {
941        *ci = ClientInit::Set(Box::new(init));
942        Ok(())
943    }
944}
945
946/// Error returned by [`set_default_client_init`] if the default was already initialized.
947#[derive(Debug, Clone, Copy)]
948#[non_exhaustive]
949pub struct DefaultAlreadyInitedError {}
950impl fmt::Display for DefaultAlreadyInitedError {
951    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
952        write!(f, "default client already initialized, can only set before first use")
953    }
954}
955impl std::error::Error for DefaultAlreadyInitedError {}
956
957/// Information about the state of an HTTP request.
958#[derive(Debug, Clone, PartialEq, Eq)]
959#[non_exhaustive]
960pub struct Metrics {
961    /// Number of bytes uploaded / estimated total.
962    pub upload_progress: (ByteLength, ByteLength),
963
964    /// Average upload speed so far in bytes/second.
965    pub upload_speed: ByteLength,
966
967    /// Number of bytes downloaded / estimated total.
968    pub download_progress: (ByteLength, ByteLength),
969
970    /// Average download speed so far in bytes/second.
971    pub download_speed: ByteLength,
972
973    /// Total time from the start of the request until DNS name resolving was completed.
974    ///
975    /// When a redirect is followed, the time from each request is added together.
976    pub name_lookup_time: Duration,
977
978    /// Amount of time taken to establish a connection to the server (not including TLS connection time).
979    ///
980    /// When a redirect is followed, the time from each request is added together.
981    pub connect_time: Duration,
982
983    /// Amount of time spent on TLS handshakes.
984    ///
985    /// When a redirect is followed, the time from each request is added together.
986    pub secure_connect_time: Duration,
987
988    /// Time it took from the start of the request until the first byte is either sent or received.
989    ///
990    /// When a redirect is followed, the time from each request is added together.
991    pub transfer_start_time: Duration,
992
993    /// Amount of time spent performing the actual request transfer. The “transfer” includes
994    /// both sending the request and receiving the response.
995    ///
996    /// When a redirect is followed, the time from each request is added together.
997    pub transfer_time: Duration,
998
999    /// Total time for the entire request. This will continuously increase until the entire
1000    /// response body is consumed and completed.
1001    ///
1002    /// When a redirect is followed, the time from each request is added together.
1003    pub total_time: Duration,
1004
1005    /// If automatic redirect following is enabled, the total time taken for all redirection steps
1006    /// including name lookup, connect, pre-transfer and transfer before final transaction was started.
1007    pub redirect_time: Duration,
1008}
1009impl Metrics {
1010    /// Init from `isahc::Metrics`.
1011    pub fn from_isahc(m: &isahc::Metrics) -> Self {
1012        Self {
1013            upload_progress: {
1014                let (c, t) = m.upload_progress();
1015                ((c as usize).bytes(), (t as usize).bytes())
1016            },
1017            upload_speed: (m.upload_speed().round() as usize).bytes(),
1018            download_progress: {
1019                let (c, t) = m.download_progress();
1020                ((c as usize).bytes(), (t as usize).bytes())
1021            },
1022            download_speed: (m.download_speed().round() as usize).bytes(),
1023            name_lookup_time: m.name_lookup_time(),
1024            connect_time: m.connect_time(),
1025            secure_connect_time: m.secure_connect_time(),
1026            transfer_start_time: m.transfer_start_time(),
1027            transfer_time: m.transfer_time(),
1028            total_time: m.total_time(),
1029            redirect_time: m.redirect_time(),
1030        }
1031    }
1032
1033    /// All zeros.
1034    pub fn zero() -> Self {
1035        Self {
1036            upload_progress: (0.bytes(), 0.bytes()),
1037            upload_speed: 0.bytes(),
1038            download_progress: (0.bytes(), 0.bytes()),
1039            download_speed: 0.bytes(),
1040            name_lookup_time: Duration::ZERO,
1041            connect_time: Duration::ZERO,
1042            secure_connect_time: Duration::ZERO,
1043            transfer_start_time: Duration::ZERO,
1044            transfer_time: Duration::ZERO,
1045            total_time: Duration::ZERO,
1046            redirect_time: Duration::ZERO,
1047        }
1048    }
1049}
1050impl From<isahc::Metrics> for Metrics {
1051    fn from(m: isahc::Metrics) -> Self {
1052        Metrics::from_isahc(&m)
1053    }
1054}
1055impl fmt::Display for Metrics {
1056    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1057        let mut ws = false; // written something
1058
1059        if self.upload_progress.0 != self.upload_progress.1 {
1060            write!(
1061                f,
1062                "↑ {} - {}, {}/s",
1063                self.upload_progress.0, self.upload_progress.1, self.upload_speed
1064            )?;
1065            ws = true;
1066        }
1067        if self.download_progress.0 != self.download_progress.1 {
1068            write!(
1069                f,
1070                "{}↓ {} - {}, {}/s",
1071                if ws { "\n" } else { "" },
1072                self.download_progress.0,
1073                self.download_progress.1,
1074                self.download_speed
1075            )?;
1076            ws = true;
1077        }
1078
1079        if !ws {
1080            if self.upload_progress.1.bytes() > 0 {
1081                write!(f, "↑ {}", self.upload_progress.1)?;
1082                ws = true;
1083            }
1084            if self.download_progress.1.bytes() > 0 {
1085                write!(f, "{}↓ {}", if ws { "\n" } else { "" }, self.download_progress.1)?;
1086                ws = true;
1087            }
1088
1089            if ws {
1090                write!(f, "\n{:?}", self.total_time)?;
1091            }
1092        }
1093
1094        Ok(())
1095    }
1096}
1097impl_from_and_into_var! {
1098    fn from(metrics: Metrics) -> Progress {
1099        let mut status = Progress::indeterminate();
1100        if metrics.download_progress.1 > 0.bytes() {
1101            status = Progress::from_n_of(metrics.download_progress.0.0, metrics.download_progress.1.0);
1102        }
1103        if metrics.upload_progress.1 > 0.bytes() {
1104            let u_status = Progress::from_n_of(metrics.upload_progress.0.0, metrics.upload_progress.1.0);
1105            if status.is_indeterminate() {
1106                status = u_status;
1107            } else {
1108                status = status.and_fct(u_status.fct());
1109            }
1110        }
1111        status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
1112            m.set(*METRICS_ID, metrics);
1113        })
1114    }
1115}
1116zng_state_map::static_id! {
1117    /// Metrics in a [`Progress::with_meta`] metadata.
1118    pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
1119}
1120
1121/// HTTP client.
1122///
1123/// An HTTP client acts as a session for executing one of more HTTP requests.
1124pub struct Client {
1125    client: isahc::HttpClient,
1126    cache: Option<Box<dyn CacheDb>>,
1127    cache_mode: Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>,
1128}
1129impl Default for Client {
1130    fn default() -> Self {
1131        Self::new()
1132    }
1133}
1134impl Clone for Client {
1135    fn clone(&self) -> Self {
1136        Client {
1137            client: self.client.clone(),
1138            cache: self.cache.as_ref().map(|b| b.clone_boxed()),
1139            cache_mode: self.cache_mode.clone(),
1140        }
1141    }
1142}
1143impl fmt::Debug for Client {
1144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1145        f.debug_struct("Client").finish_non_exhaustive()
1146    }
1147}
1148impl Client {
1149    /// New client with default config.
1150    ///
1151    /// This enables cookies, sets the `redirect_policy` with a limit of up-to 20 redirects and `auto_referer`, sets
1152    /// the `connect_timeout` to 90 seconds and enables `metrics`.
1153    pub fn new() -> Self {
1154        Client::builder()
1155            .cookies()
1156            .redirect_policy(RedirectPolicy::Limit(20))
1157            .connect_timeout(90.secs())
1158            .metrics(true)
1159            .build()
1160    }
1161
1162    /// Start a new [`ClientBuilder`] for creating a custom client.
1163    pub fn builder() -> ClientBuilder {
1164        ClientBuilder {
1165            builder: isahc::HttpClient::builder(),
1166            cache: None,
1167            cache_mode: None,
1168        }
1169    }
1170
1171    /// Gets the configured cookie-jar for this client, if cookies are enabled.
1172    pub fn cookie_jar(&self) -> Option<&CookieJar> {
1173        self.client.cookie_jar()
1174    }
1175
1176    /// Send a GET request to the `uri`.
1177    pub async fn get(&self, uri: impl TryUri) -> Result<Response, Error> {
1178        self.send(Request::get(uri)?.build()).await
1179    }
1180
1181    /// Send a GET request to the `uri` and read the response as a string.
1182    pub async fn get_txt(&self, uri: impl TryUri) -> Result<Txt, Error> {
1183        let mut r = self.get(uri).await?;
1184        let r = r.text().await?;
1185        Ok(r)
1186    }
1187
1188    /// Send a GET request to the `uri` and read the response as raw bytes.
1189    pub async fn get_bytes(&self, uri: impl TryUri) -> Result<Vec<u8>, Error> {
1190        let mut r = self.get(uri).await?;
1191        let r = r.bytes().await?;
1192        Ok(r)
1193    }
1194
1195    /// Send a GET request to the `uri` and de-serializes the response.
1196    pub async fn get_json<O>(&self, uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
1197    where
1198        O: serde::de::DeserializeOwned + std::marker::Unpin,
1199    {
1200        let mut r = self.get(uri).await?;
1201        let r = r.json::<O>().await?;
1202        Ok(r)
1203    }
1204
1205    /// Send a HEAD request to the `uri`.
1206    pub async fn head(&self, uri: impl TryUri) -> Result<Response, Error> {
1207        self.send(Request::head(uri)?.build()).await
1208    }
1209    /// Send a PUT request to the `uri` with a given request body.
1210    pub async fn put(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1211        self.send(Request::put(uri)?.body(body)?).await
1212    }
1213
1214    /// Send a POST request to the `uri` with a given request body.
1215    pub async fn post(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1216        self.send(Request::post(uri)?.body(body)?).await
1217    }
1218
1219    /// Send a DELETE request to the `uri`.
1220    pub async fn delete(&self, uri: impl TryUri) -> Result<Response, Error> {
1221        self.send(Request::delete(uri)?.build()).await
1222    }
1223
1224    /// Send a custom [`Request`].
1225    ///
1226    /// # Cache
1227    ///
1228    /// If the client has a [`cache`] and the request uses the `GET` method the result will be cached
1229    /// according with the [`cache_mode`] selected for the request.
1230    ///
1231    /// [`cache`]: Self::cache
1232    /// [`cache_mode`]: Self::cache_mode
1233    pub async fn send(&self, request: Request) -> Result<Response, Error> {
1234        if let Some(db) = &self.cache {
1235            match self.cache_mode(&request) {
1236                CacheMode::NoCache => {
1237                    let response = self.client.send_async(request.req).await?;
1238                    let response = request.limits.check(response)?;
1239                    Ok(Response(response))
1240                }
1241                CacheMode::Default => self.send_cache_default(&**db, request, 0).await,
1242                CacheMode::Permanent => self.send_cache_permanent(&**db, request, 0).await,
1243                CacheMode::Error(e) => Err(e),
1244            }
1245        } else {
1246            let response = self.client.send_async(request.req).await?;
1247            let response = request.limits.check(response)?;
1248            Ok(Response(response))
1249        }
1250    }
1251
1252    #[async_recursion::async_recursion]
1253    async fn send_cache_default(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1254        if retry_count == 3 {
1255            tracing::error!("retried cache 3 times, skipping cache");
1256            let response = self.client.send_async(request.req).await?;
1257            let response = request.limits.check(response)?;
1258            return Ok(Response(response));
1259        }
1260
1261        let key = CacheKey::new(&request.req);
1262        if let Some(policy) = db.policy(&key).await {
1263            match policy.before_request(&request.req) {
1264                BeforeRequest::Fresh(parts) => {
1265                    if let Some(body) = db.body(&key).await {
1266                        let response = isahc::Response::from_parts(parts, body.0);
1267                        let response = request.limits.check(response)?;
1268
1269                        Ok(Response(response))
1270                    } else {
1271                        tracing::error!("cache returned policy but not body");
1272                        db.remove(&key).await;
1273                        self.send_cache_default(db, request, retry_count + 1).await
1274                    }
1275                }
1276                BeforeRequest::Stale { request: parts, matches } => {
1277                    if matches {
1278                        let (_, body) = request.req.into_parts();
1279                        let request = Request {
1280                            req: isahc::Request::from_parts(parts, body),
1281                            limits: request.limits,
1282                        };
1283                        let policy_request = request.clone_with(()).unwrap().req;
1284                        let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1285
1286                        let response = self.client.send_async(request.req).await?;
1287                        let response = request.limits.check(response)?;
1288
1289                        match policy.after_response(&policy_request, &response) {
1290                            AfterResponse::NotModified(policy, parts) => {
1291                                if let Some(body) = db.body(&key).await {
1292                                    let response = isahc::Response::from_parts(parts, body.0);
1293
1294                                    db.set_policy(&key, policy).await;
1295
1296                                    Ok(Response(response))
1297                                } else {
1298                                    tracing::error!("cache returned policy but not body");
1299                                    db.remove(&key).await;
1300
1301                                    if no_req_body {
1302                                        self.send_cache_default(
1303                                            db,
1304                                            Request {
1305                                                req: policy_request,
1306                                                limits: request.limits,
1307                                            },
1308                                            retry_count + 1,
1309                                        )
1310                                        .await
1311                                    } else {
1312                                        Err(std::io::Error::new(
1313                                            std::io::ErrorKind::NotFound,
1314                                            "cache returned policy but not body, cannot auto-retry",
1315                                        )
1316                                        .into())
1317                                    }
1318                                }
1319                            }
1320                            AfterResponse::Modified(policy, parts) => {
1321                                if policy.should_store() {
1322                                    let (_, body) = response.into_parts();
1323                                    if let Some(body) = db.set(&key, policy, Body(body)).await {
1324                                        let response = isahc::Response::from_parts(parts, body.0);
1325
1326                                        Ok(Response(response))
1327                                    } else {
1328                                        tracing::error!("cache db failed to store body");
1329                                        db.remove(&key).await;
1330
1331                                        if no_req_body {
1332                                            self.send_cache_default(
1333                                                db,
1334                                                Request {
1335                                                    req: policy_request,
1336                                                    limits: request.limits,
1337                                                },
1338                                                retry_count + 1,
1339                                            )
1340                                            .await
1341                                        } else {
1342                                            Err(std::io::Error::new(
1343                                                std::io::ErrorKind::NotFound,
1344                                                "cache db failed to store body, cannot auto-retry",
1345                                            )
1346                                            .into())
1347                                        }
1348                                    }
1349                                } else {
1350                                    db.remove(&key).await;
1351
1352                                    Ok(Response(response))
1353                                }
1354                            }
1355                        }
1356                    } else {
1357                        tracing::error!("cache policy did not match request, {request:?}");
1358                        db.remove(&key).await;
1359                        let response = self.client.send_async(request.req).await?;
1360                        let response = request.limits.check(response)?;
1361                        Ok(Response(response))
1362                    }
1363                }
1364            }
1365        } else {
1366            let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1367            let policy_request = request.clone_with(()).unwrap().req;
1368
1369            let response = self.client.send_async(request.req).await?;
1370            let response = request.limits.check(response)?;
1371
1372            let policy = CachePolicy::new(&policy_request, &response);
1373
1374            if policy.should_store() {
1375                let (parts, body) = response.into_parts();
1376
1377                if let Some(body) = db.set(&key, policy, Body(body)).await {
1378                    let response = isahc::Response::from_parts(parts, body.0);
1379
1380                    Ok(Response(response))
1381                } else {
1382                    tracing::error!("cache db failed to store body");
1383                    db.remove(&key).await;
1384
1385                    if no_req_body {
1386                        self.send_cache_default(
1387                            db,
1388                            Request {
1389                                req: policy_request,
1390                                limits: request.limits,
1391                            },
1392                            retry_count + 1,
1393                        )
1394                        .await
1395                    } else {
1396                        Err(std::io::Error::new(std::io::ErrorKind::NotFound, "cache db failed to store body, cannot auto-retry").into())
1397                    }
1398                }
1399            } else {
1400                Ok(Response(response))
1401            }
1402        }
1403    }
1404
1405    #[async_recursion::async_recursion]
1406    async fn send_cache_permanent(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1407        if retry_count == 3 {
1408            tracing::error!("retried cache 3 times, skipping cache");
1409            let response = self.client.send_async(request.req).await?;
1410            let response = request.limits.check(response)?;
1411            return Ok(Response(response));
1412        }
1413
1414        let key = CacheKey::new(&request.req);
1415        if let Some(policy) = db.policy(&key).await {
1416            if let Some(body) = db.body(&key).await {
1417                match policy.before_request(&request.req) {
1418                    BeforeRequest::Fresh(p) => {
1419                        let response = isahc::Response::from_parts(p, body.0);
1420                        let response = request.limits.check(response)?;
1421
1422                        if !policy.is_permanent() {
1423                            db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1424                        }
1425
1426                        Ok(Response(response))
1427                    }
1428                    BeforeRequest::Stale { request: parts, .. } => {
1429                        // policy was not permanent when cached
1430
1431                        let limits = request.limits.clone();
1432
1433                        let (_, req_body) = request.req.into_parts();
1434                        let request = isahc::Request::from_parts(parts, req_body);
1435
1436                        let response = self.client.send_async(request).await?;
1437                        let response = limits.check(response)?;
1438
1439                        let (parts, _) = response.into_parts();
1440
1441                        let response = isahc::Response::from_parts(parts, body.0);
1442
1443                        db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1444
1445                        Ok(Response(response))
1446                    }
1447                }
1448            } else {
1449                tracing::error!("cache returned policy but not body");
1450                db.remove(&key).await;
1451                self.send_cache_permanent(db, request, retry_count + 1).await
1452            }
1453        } else {
1454            let backup_request = if request.req.body().len().map(|l| l == 0).unwrap_or(false) {
1455                Some(request.clone_with(()).unwrap())
1456            } else {
1457                None
1458            };
1459
1460            let response = self.client.send_async(request.req).await?;
1461            let response = request.limits.check(response)?;
1462            let policy = CachePolicy::new_permanent(&response);
1463
1464            let (parts, body) = response.into_parts();
1465
1466            if let Some(body) = db.set(&key, policy, Body(body)).await {
1467                let response = isahc::Response::from_parts(parts, body.0);
1468                Ok(Response(response))
1469            } else {
1470                tracing::error!("cache db failed to store body");
1471                db.remove(&key).await;
1472
1473                if let Some(request) = backup_request {
1474                    self.send_cache_permanent(db, request, retry_count + 1).await
1475                } else {
1476                    Err(std::io::Error::new(
1477                        std::io::ErrorKind::NotFound,
1478                        "cache db failed to store permanent body, cannot auto-retry",
1479                    )
1480                    .into())
1481                }
1482            }
1483        }
1484    }
1485
1486    /// Reference the cache used in this client.
1487    pub fn cache(&self) -> Option<&dyn CacheDb> {
1488        self.cache.as_deref()
1489    }
1490
1491    /// Returns the [`CacheMode`] that is used in this client if the request is made.
1492    pub fn cache_mode(&self, request: &Request) -> CacheMode {
1493        if self.cache.is_none() || request.method() != Method::GET {
1494            CacheMode::NoCache
1495        } else {
1496            (self.cache_mode)(request)
1497        }
1498    }
1499}
1500impl From<Client> for isahc::HttpClient {
1501    fn from(c: Client) -> Self {
1502        c.client
1503    }
1504}
1505impl From<isahc::HttpClient> for Client {
1506    fn from(client: isahc::HttpClient) -> Self {
1507        Self {
1508            client,
1509            cache: None,
1510            cache_mode: Arc::new(|_| CacheMode::default()),
1511        }
1512    }
1513}
1514
1515/// Builder that can be used to create a [`Client`].
1516///
1517/// Use [`Client::builder`] to start building.
1518///
1519/// # Examples
1520///
1521/// ```
1522/// use zng_task::http::*;
1523///
1524/// let client = Client::builder().metrics(true).build();
1525/// ```
1526pub struct ClientBuilder {
1527    builder: isahc::HttpClientBuilder,
1528    cache: Option<Box<dyn CacheDb>>,
1529    cache_mode: Option<Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>>,
1530}
1531impl Default for ClientBuilder {
1532    fn default() -> Self {
1533        Client::builder()
1534    }
1535}
1536impl ClientBuilder {
1537    /// New default builder.
1538    pub fn new() -> Self {
1539        Client::builder()
1540    }
1541
1542    /// Build the [`Client`] using the configured options.
1543    pub fn build(self) -> Client {
1544        Client {
1545            client: self.builder.build().unwrap(),
1546            cache: self.cache,
1547            cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1548        }
1549    }
1550
1551    /// Build the client with more custom build calls in the [inner builder].
1552    ///
1553    /// [inner builder]: isahc::HttpClientBuilder
1554    pub fn build_custom<F>(self, custom: F) -> Result<Client, Error>
1555    where
1556        F: FnOnce(isahc::HttpClientBuilder) -> Result<isahc::HttpClient, Error>,
1557    {
1558        custom(self.builder).map(|c| Client {
1559            client: c,
1560            cache: self.cache,
1561            cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1562        })
1563    }
1564
1565    /// Add a default header to be passed with every request.
1566    pub fn default_header(self, key: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
1567        Ok(Self {
1568            builder: self.builder.default_header(key.try_header_name()?, value.try_header_value()?),
1569            cache: self.cache,
1570            cache_mode: self.cache_mode,
1571        })
1572    }
1573
1574    /// Enable persistent cookie handling for all requests using this client using a shared cookie jar.
1575    pub fn cookies(self) -> Self {
1576        Self {
1577            builder: self.builder.cookies(),
1578            cache: self.cache,
1579            cache_mode: self.cache_mode,
1580        }
1581    }
1582
1583    /// Set a cookie jar to use to accept, store, and supply cookies for incoming responses and outgoing requests.
1584    ///
1585    /// Note that the [`default_client`] already has a cookie jar.
1586    pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
1587        Self {
1588            builder: self.builder.cookie_jar(cookie_jar),
1589            cache: self.cache,
1590            cache_mode: self.cache_mode,
1591        }
1592    }
1593
1594    /// Specify a maximum amount of time that a complete request/response cycle is allowed to
1595    /// take before being aborted. This includes DNS resolution, connecting to the server,
1596    /// writing the request, and reading the response.
1597    ///
1598    /// Note that this includes the response read operation, so if you get a response but don't
1599    /// read-it within this timeout you will get a [`TimedOut`] IO error.
1600    ///
1601    /// By default no timeout is used.
1602    ///
1603    /// [`TimedOut`]: https://doc.rust-lang.org/nightly/std/io/enum.ErrorKind.html#variant.TimedOut
1604    pub fn timeout(self, timeout: Duration) -> Self {
1605        Self {
1606            builder: self.builder.timeout(timeout),
1607            cache: self.cache,
1608            cache_mode: self.cache_mode,
1609        }
1610    }
1611
1612    /// Set a timeout for establishing connections to a host.
1613    ///
1614    /// If not set, the [`default_client`] default of 90 seconds will be used.
1615    pub fn connect_timeout(self, timeout: Duration) -> Self {
1616        Self {
1617            builder: self.builder.connect_timeout(timeout),
1618            cache: self.cache,
1619            cache_mode: self.cache_mode,
1620        }
1621    }
1622
1623    /// Specify a maximum amount of time where transfer rate can go below a minimum speed limit.
1624    ///
1625    /// The `low_speed` limit is in bytes/s. No low-speed limit is configured by default.
1626    pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
1627        Self {
1628            builder: self.builder.low_speed_timeout(low_speed, timeout),
1629            cache: self.cache,
1630            cache_mode: self.cache_mode,
1631        }
1632    }
1633
1634    /// Set a policy for automatically following server redirects.
1635    ///
1636    /// If enabled the "Referer" header will be set automatically too.
1637    pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
1638        if !matches!(policy, RedirectPolicy::None) {
1639            Self {
1640                builder: self.builder.redirect_policy(policy).auto_referer(),
1641                cache: self.cache,
1642                cache_mode: self.cache_mode,
1643            }
1644        } else {
1645            Self {
1646                builder: self.builder.redirect_policy(policy),
1647                cache: self.cache,
1648                cache_mode: self.cache_mode,
1649            }
1650        }
1651    }
1652
1653    /// Enable or disable automatic decompression of the response body.
1654    ///
1655    /// If enabled the "Accept-Encoding" will also be set automatically, if it was not set using [`default_header`].
1656    ///
1657    /// This is enabled by default.
1658    ///
1659    /// [`default_header`]: Self::default_header
1660    pub fn auto_decompress(self, enabled: bool) -> Self {
1661        Self {
1662            builder: self.builder.automatic_decompression(enabled),
1663            cache: self.cache,
1664            cache_mode: self.cache_mode,
1665        }
1666    }
1667
1668    /// Set a maximum upload speed for the request body, in bytes per second.
1669    pub fn max_upload_speed(self, max: u64) -> Self {
1670        Self {
1671            builder: self.builder.max_upload_speed(max),
1672            cache: self.cache,
1673            cache_mode: self.cache_mode,
1674        }
1675    }
1676
1677    /// Set a maximum download speed for the response body, in bytes per second.
1678    pub fn max_download_speed(self, max: u64) -> Self {
1679        Self {
1680            builder: self.builder.max_download_speed(max),
1681            cache: self.cache,
1682            cache_mode: self.cache_mode,
1683        }
1684    }
1685
1686    /// Enable or disable metrics collecting.
1687    ///
1688    /// When enabled you can get the information using the [`Response::metrics`] method.
1689    ///
1690    /// This is enabled by default.
1691    pub fn metrics(self, enable: bool) -> Self {
1692        Self {
1693            builder: self.builder.metrics(enable),
1694            cache: self.cache,
1695            cache_mode: self.cache_mode,
1696        }
1697    }
1698
1699    /// Sets the [`CacheDb`] to use.
1700    ///
1701    /// Caching is only enabled if there is a DB, no caching is done by default.
1702    pub fn cache(self, cache: impl CacheDb) -> Self {
1703        Self {
1704            builder: self.builder,
1705            cache: Some(Box::new(cache)),
1706            cache_mode: self.cache_mode,
1707        }
1708    }
1709
1710    /// Sets the [`CacheMode`] selector.
1711    ///
1712    /// The `selector` closure is called for every cacheable request before it is made, it
1713    /// must return a [`CacheMode`] value that configures how the [`cache`] is used.
1714    ///
1715    /// Note that the closure is only called if a [`cache`] is set.
1716    ///
1717    /// [`cache`]: Self::cache
1718    pub fn cache_mode(self, selector: impl Fn(&Request) -> CacheMode + Send + Sync + 'static) -> Self {
1719        Self {
1720            builder: self.builder,
1721            cache: self.cache,
1722            cache_mode: Some(Arc::new(selector)),
1723        }
1724    }
1725}
1726
1727/// An error encountered while sending an HTTP request or receiving an HTTP response using a [`Client`].
1728#[derive(Debug, Clone)]
1729#[non_exhaustive]
1730pub enum Error {
1731    /// Error from the HTTP client.
1732    Client(isahc::Error),
1733    /// Error when [`max_length`] validation fails at the header or after streaming download.
1734    ///
1735    /// [`max_length`]: RequestBuilder::max_length
1736    MaxLength {
1737        /// The `Content-Length` header value, if it was set.
1738        content_length: Option<ByteLength>,
1739        /// The maximum allowed length.
1740        max_length: ByteLength,
1741    },
1742    /// Error when [`require_length`] is set, but a response was sent without the `Content-Length` header.
1743    ///
1744    /// [`require_length`]: RequestBuilder::require_length
1745    RequireLength,
1746}
1747impl StdError for Error {
1748    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1749        match self {
1750            Error::Client(e) => Some(e),
1751            _ => None,
1752        }
1753    }
1754}
1755impl From<isahc::Error> for Error {
1756    fn from(e: isahc::Error) -> Self {
1757        if let Some(e) = e
1758            .source()
1759            .and_then(|e| e.downcast_ref::<std::io::Error>())
1760            .and_then(|e| e.get_ref())
1761        {
1762            if let Some(e) = e.downcast_ref::<MaxLengthError>() {
1763                return Error::MaxLength {
1764                    content_length: e.0,
1765                    max_length: e.1,
1766                };
1767            }
1768            if e.downcast_ref::<RequireLengthError>().is_some() {
1769                return Error::RequireLength;
1770            }
1771        }
1772        Error::Client(e)
1773    }
1774}
1775impl From<isahc::http::Error> for Error {
1776    fn from(e: isahc::http::Error) -> Self {
1777        isahc::Error::from(e).into()
1778    }
1779}
1780impl From<std::io::Error> for Error {
1781    fn from(e: std::io::Error) -> Self {
1782        isahc::Error::from(e).into()
1783    }
1784}
1785impl fmt::Display for Error {
1786    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1787        match self {
1788            Error::Client(e) => write!(f, "{e}"),
1789            Error::MaxLength {
1790                content_length,
1791                max_length,
1792            } => write!(f, "{}", MaxLengthError(*content_length, *max_length)),
1793            Error::RequireLength => write!(f, "{}", RequireLengthError {}),
1794        }
1795    }
1796}
1797
1798// Error types smuggled inside an io::Error inside the isahc::Error.
1799
1800#[derive(Debug)]
1801struct MaxLengthError(Option<ByteLength>, ByteLength);
1802impl fmt::Display for MaxLengthError {
1803    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1804        if let Some(l) = self.0 {
1805            write!(f, "content-length of {l} exceeds limit of {}", self.1)
1806        } else {
1807            write!(f, "download reached limit of {}", self.1)
1808        }
1809    }
1810}
1811impl StdError for MaxLengthError {}
1812
1813#[derive(Debug)]
1814#[non_exhaustive]
1815struct RequireLengthError {}
1816impl fmt::Display for RequireLengthError {
1817    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1818        write!(f, "content-length is required")
1819    }
1820}
1821impl StdError for RequireLengthError {}