zng_task/
http.rs

1#![cfg(feature = "http")]
2// suppress nag about very simple boxed closure signatures.
3#![expect(clippy::type_complexity)]
4
5//! HTTP client.
6//!
7//! This module is a thin wrapper around the [`isahc`] crate that just limits the API surface to only
8//! `async` methods without the async suffix. You can convert from/into that [`isahc`] types and this one.
9//!
10//! # Examples
11//!
12//! Get some text:
13//!
14//! ```
15//! # use zng_task as task;
16//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
17//! let text = task::http::get_txt("https://httpbin.org/base64/SGVsbG8gV29ybGQ=").await?;
18//! println!("{text}!");
19//! # Ok(()) }
20//! ```
21//!
22//! [`isahc`]: https://docs.rs/isahc
23
24mod cache;
25mod util;
26
27pub use cache::*;
28use zng_var::impl_from_and_into_var;
29
30use std::convert::TryFrom;
31use std::error::Error as StdError;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::time::Duration;
35use std::{fmt, mem};
36
37use crate::Progress;
38
39use super::io::AsyncRead;
40
41use isahc::config::Configurable;
42pub use isahc::config::RedirectPolicy;
43pub use isahc::cookies::{Cookie, CookieJar};
44pub use isahc::http::{Method, StatusCode, Uri, header, uri};
45
46use futures_lite::io::{AsyncReadExt, BufReader};
47use isahc::{AsyncReadResponseExt, ResponseExt};
48use parking_lot::{Mutex, const_mutex};
49
50use zng_txt::{Txt, formatx};
51use zng_unit::*;
52
53/// Marker trait for types that try-to-convert to [`Uri`].
54///
55/// All types `T` that match `Uri: TryFrom<T>, <Uri as TryFrom<T>>::Error: Into<isahc::http::Error>` implement this trait.
56#[diagnostic::on_unimplemented(note = "`TryUri` is implemented for all `T` where `Uri: TryFrom<T, Error: Into<isahc::http::Error>>`")]
57pub trait TryUri {
58    /// Tries to convert `self` into [`Uri`].
59    fn try_uri(self) -> Result<Uri, Error>;
60}
61impl<U> TryUri for U
62where
63    Uri: TryFrom<U>,
64    <Uri as TryFrom<U>>::Error: Into<isahc::http::Error>,
65{
66    fn try_uri(self) -> Result<Uri, Error> {
67        Uri::try_from(self).map_err(|e| e.into().into())
68    }
69}
70
71/// Marker trait for types that try-to-convert to [`Method`].
72///
73/// All types `T` that match `Method: TryFrom<T>, <Method as TryFrom<T>>::Error: Into<isahc::http::Error>` implement this trait.
74#[diagnostic::on_unimplemented(note = "`TryMethod` is implemented for all `T` where `Method: TryFrom<T, Error: Into<isahc::http::Error>>`")]
75pub trait TryMethod {
76    /// Tries to convert `self` into [`Method`].
77    fn try_method(self) -> Result<Method, Error>;
78}
79impl<U> TryMethod for U
80where
81    Method: TryFrom<U>,
82    <isahc::http::Method as TryFrom<U>>::Error: Into<isahc::http::Error>,
83{
84    fn try_method(self) -> Result<Method, Error> {
85        Method::try_from(self).map_err(|e| e.into().into())
86    }
87}
88
89/// Marker trait for types that try-to-convert to [`Body`].
90///
91/// All types `T` that match `isahc::AsyncBody: TryFrom<T>, <isahc::AsyncBody as TryFrom<T>>::Error: Into<isahc::http::Error>`
92/// implement this trait.
93#[diagnostic::on_unimplemented(note = "`TryBody` is implemented for all `T` where `Body: TryFrom<T, Error: Into<isahc::http::Error>>`")]
94pub trait TryBody {
95    /// Tries to convert `self` into [`Body`].
96    fn try_body(self) -> Result<Body, Error>;
97}
98impl<U> TryBody for U
99where
100    isahc::AsyncBody: TryFrom<U>,
101    <isahc::AsyncBody as TryFrom<U>>::Error: Into<isahc::http::Error>,
102{
103    fn try_body(self) -> Result<Body, Error> {
104        match isahc::AsyncBody::try_from(self) {
105            Ok(r) => Ok(Body(r)),
106            Err(e) => Err(e.into().into()),
107        }
108    }
109}
110
111/// Marker trait for types that try-to-convert to [`header::HeaderName`].
112///
113/// All types `T` that match `header::HeaderName: TryFrom<T>, <header::HeaderName as TryFrom<T>>::Error: Into<isahc::http::Error>`
114/// implement this trait.
115#[diagnostic::on_unimplemented(
116    note = "`TryHeaderName` is implemented for all `T` where `HeaderName: TryFrom<T, Error: Into<isahc::http::Error>>`"
117)]
118pub trait TryHeaderName {
119    /// Tries to convert `self` into [`Body`].
120    fn try_header_name(self) -> Result<header::HeaderName, Error>;
121}
122impl<U> TryHeaderName for U
123where
124    header::HeaderName: TryFrom<U>,
125    <header::HeaderName as TryFrom<U>>::Error: Into<isahc::http::Error>,
126{
127    fn try_header_name(self) -> Result<header::HeaderName, Error> {
128        header::HeaderName::try_from(self).map_err(|e| e.into().into())
129    }
130}
131
132/// Marker trait for types that try-to-convert to [`header::HeaderValue`].
133///
134/// All types `T` that match `header::HeaderValue: TryFrom<T>, <header::HeaderValue as TryFrom<T>>::Error: Into<isahc::http::Error>`
135/// implement this trait.
136#[diagnostic::on_unimplemented(
137    note = "`TryHeaderValue` is implemented for all `T` where `HeaderValue: TryFrom<T, Error: Into<isahc::http::Error>>`"
138)]
139pub trait TryHeaderValue {
140    /// Tries to convert `self` into [`Body`].
141    fn try_header_value(self) -> Result<header::HeaderValue, Error>;
142}
143impl<U> TryHeaderValue for U
144where
145    header::HeaderValue: TryFrom<U>,
146    <header::HeaderValue as TryFrom<U>>::Error: Into<isahc::http::Error>,
147{
148    fn try_header_value(self) -> Result<header::HeaderValue, Error> {
149        header::HeaderValue::try_from(self).map_err(|e| e.into().into())
150    }
151}
152
153/// HTTP request.
154///
155/// Use [`send`] to send a request.
156#[derive(Debug)]
157pub struct Request {
158    req: isahc::Request<Body>,
159    limits: ResponseLimits,
160}
161impl Request {
162    /// Starts an empty builder.
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// use zng_task::http;
168    ///
169    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
170    /// let request = http::Request::builder().method(http::Method::PUT)?.uri("https://httpbin.org/put")?.build();
171    /// # Ok(()) }
172    /// ```
173    ///
174    /// Call [`build`] or [`body`] to finish building the request, note that there are is also an associated function
175    /// to start a builder for each HTTP method and uri.
176    ///
177    /// [`build`]: RequestBuilder::build
178    /// [`body`]: RequestBuilder::body
179    pub fn builder() -> RequestBuilder {
180        RequestBuilder::start(isahc::Request::builder())
181    }
182
183    /// Starts building a GET request.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use zng_task::http;
189    ///
190    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
191    /// let get = http::Request::get("https://httpbin.org/get")?.build();
192    /// # Ok(()) }
193    /// ```
194    pub fn get(uri: impl TryUri) -> Result<RequestBuilder, Error> {
195        Ok(RequestBuilder::start(isahc::Request::get(uri.try_uri()?)))
196    }
197
198    /// Starts building a PUT request.
199    ///
200    /// # Examples
201    ///
202    /// ```
203    /// use zng_task::http;
204    ///
205    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
206    /// let put = http::Request::put("https://httpbin.org/put")?.header("accept", "application/json")?.build();
207    /// # Ok(()) }
208    /// ```
209    pub fn put(uri: impl TryUri) -> Result<RequestBuilder, Error> {
210        Ok(RequestBuilder::start(isahc::Request::put(uri.try_uri()?)))
211    }
212
213    /// Starts building a POST request.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// use zng_task::http;
219    ///
220    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
221    /// let post = http::Request::post("https://httpbin.org/post")?.header("accept", "application/json")?.build();
222    /// # Ok(()) }
223    /// ```
224    pub fn post(uri: impl TryUri) -> Result<RequestBuilder, Error> {
225        Ok(RequestBuilder::start(isahc::Request::post(uri.try_uri()?)))
226    }
227
228    /// Starts building a DELETE request.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use zng_task::http;
234    ///
235    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
236    /// let delete = http::Request::delete("https://httpbin.org/delete")?.header("accept", "application/json")?.build();
237    /// # Ok(()) }
238    /// ```
239    pub fn delete(uri: impl TryUri) -> Result<RequestBuilder, Error> {
240        Ok(RequestBuilder::start(isahc::Request::delete(uri.try_uri()?)))
241    }
242
243    /// Starts building a PATCH request.
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// use zng_task::http;
249    ///
250    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
251    /// let patch = http::Request::patch("https://httpbin.org/patch")?.header("accept", "application/json")?.build();
252    /// # Ok(()) }
253    /// ```
254    pub fn patch(uri: impl TryUri) -> Result<RequestBuilder, Error> {
255        Ok(RequestBuilder::start(isahc::Request::patch(uri.try_uri()?)))
256    }
257
258    /// Starts building a HEAD request.
259    ///
260    /// # Examples
261    ///
262    /// ```
263    /// use zng_task::http;
264    ///
265    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
266    /// let head = http::Request::head("https://httpbin.org")?.build();
267    /// # Ok(()) }
268    /// ```
269    pub fn head(uri: impl TryUri) -> Result<RequestBuilder, Error> {
270        Ok(RequestBuilder::start(isahc::Request::head(uri.try_uri()?)))
271    }
272
273    /// Returns a reference to the associated URI.
274    pub fn uri(&self) -> &Uri {
275        self.req.uri()
276    }
277
278    /// Returns a reference to the associated HTTP method.
279    pub fn method(&self) -> &Method {
280        self.req.method()
281    }
282
283    /// Returns a reference to the associated header field map.
284    pub fn headers(&self) -> &header::HeaderMap {
285        self.req.headers()
286    }
287
288    /// Create a clone of the request method, URI, version and headers, with a new `body`.
289    pub fn clone_with(&self, body: impl TryBody) -> Result<Self, Error> {
290        let body = body.try_body()?;
291
292        let mut req = isahc::Request::new(body);
293        *req.method_mut() = self.req.method().clone();
294        *req.uri_mut() = self.req.uri().clone();
295        *req.version_mut() = self.req.version();
296        let headers = req.headers_mut();
297        for (name, value) in self.headers() {
298            headers.insert(name.clone(), value.clone());
299        }
300
301        Ok(Self {
302            req,
303            limits: self.limits.clone(),
304        })
305    }
306}
307
308#[derive(Debug, Default, Clone)]
309struct ResponseLimits {
310    max_length: Option<ByteLength>,
311    require_length: bool,
312}
313impl ResponseLimits {
314    fn check(&self, response: isahc::Response<isahc::AsyncBody>) -> Result<isahc::Response<isahc::AsyncBody>, Error> {
315        if self.require_length || self.max_length.is_some() {
316            let response = Response(response);
317            if let Some(len) = response.content_len() {
318                if let Some(max) = self.max_length
319                    && max < len
320                {
321                    return Err(Error::MaxLength {
322                        content_length: Some(len),
323                        max_length: max,
324                    });
325                }
326            } else if self.require_length {
327                return Err(Error::RequireLength);
328            }
329
330            if let Some(max) = self.max_length {
331                let (parts, body) = response.0.into_parts();
332                let response = isahc::Response::from_parts(
333                    parts,
334                    isahc::AsyncBody::from_reader(super::io::ReadLimited::new(body, max, move || {
335                        std::io::Error::new(std::io::ErrorKind::InvalidData, MaxLengthError(None, max))
336                    })),
337                );
338
339                Ok(response)
340            } else {
341                Ok(response.0)
342            }
343        } else {
344            Ok(response)
345        }
346    }
347}
348
349/// A [`Request`] builder.
350///
351/// You can use [`Request::builder`] to start an empty builder.
352#[derive(Debug)]
353pub struct RequestBuilder {
354    builder: isahc::http::request::Builder,
355    limits: ResponseLimits,
356}
357impl Default for RequestBuilder {
358    fn default() -> Self {
359        Request::builder()
360    }
361}
362impl RequestBuilder {
363    /// New default request builder.
364    pub fn new() -> Self {
365        Request::builder()
366    }
367
368    fn start(builder: isahc::http::request::Builder) -> Self {
369        Self {
370            builder,
371            limits: ResponseLimits::default(),
372        }
373    }
374
375    /// Set the HTTP method for this request.
376    pub fn method(self, method: impl TryMethod) -> Result<Self, Error> {
377        Ok(Self {
378            builder: self.builder.method(method.try_method()?),
379            limits: self.limits,
380        })
381    }
382
383    /// Set the URI for this request.
384    pub fn uri(self, uri: impl TryUri) -> Result<Self, Error> {
385        Ok(Self {
386            builder: self.builder.uri(uri.try_uri()?),
387            limits: self.limits,
388        })
389    }
390
391    /// Appends a header to this request.
392    pub fn header(self, name: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
393        Ok(Self {
394            builder: self.builder.header(name.try_header_name()?, value.try_header_value()?),
395            limits: self.limits,
396        })
397    }
398
399    /// Set a cookie jar to use to accept, store, and supply cookies for incoming responses and outgoing requests.
400    ///
401    /// Note that the [`default_client`] already has a cookie jar.
402    pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
403        Self {
404            builder: self.builder.cookie_jar(cookie_jar),
405            limits: self.limits,
406        }
407    }
408
409    /// Specify a maximum amount of time that a complete request/response cycle is allowed to
410    /// take before being aborted. This includes DNS resolution, connecting to the server,
411    /// writing the request, and reading the response.
412    ///
413    /// Note that this includes the response read operation, so if you get a response but don't
414    /// read-it within this timeout you will get a [`TimedOut`] IO error.
415    ///
416    /// By default no timeout is used.
417    ///
418    /// [`TimedOut`]: https://doc.rust-lang.org/nightly/std/io/enum.ErrorKind.html#variant.TimedOut
419    pub fn timeout(self, timeout: Duration) -> Self {
420        Self {
421            builder: self.builder.timeout(timeout),
422            limits: self.limits,
423        }
424    }
425
426    /// Set a timeout for establishing connections to a host.
427    ///
428    /// If not set, the [`default_client`] default of 90 seconds will be used.
429    pub fn connect_timeout(self, timeout: Duration) -> Self {
430        Self {
431            builder: self.builder.connect_timeout(timeout),
432            limits: self.limits,
433        }
434    }
435
436    /// Specify a maximum amount of time where transfer rate can go below a minimum speed limit.
437    ///
438    /// The `low_speed` limit is in bytes/s. No low-speed limit is configured by default.
439    pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
440        Self {
441            builder: self.builder.low_speed_timeout(low_speed, timeout),
442            limits: self.limits,
443        }
444    }
445
446    /// Set a policy for automatically following server redirects.
447    ///
448    /// If enabled the "Referer" header will be set automatically too.
449    ///
450    /// The [`default_client`] follows up-to 20 redirects.
451    pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
452        if !matches!(policy, RedirectPolicy::None) {
453            Self {
454                builder: self.builder.redirect_policy(policy).auto_referer(),
455                limits: self.limits,
456            }
457        } else {
458            Self {
459                builder: self.builder.redirect_policy(policy),
460                limits: self.limits,
461            }
462        }
463    }
464
465    /// Enable or disable automatic decompression of the response body.
466    ///
467    /// If enabled the "Accept-Encoding" will also be set automatically, if it was not set using [`header`].
468    ///
469    /// This is enabled by default.
470    ///
471    /// [`header`]: Self::header
472    pub fn auto_decompress(self, enabled: bool) -> Self {
473        Self {
474            builder: self.builder.automatic_decompression(enabled),
475            limits: self.limits,
476        }
477    }
478
479    /// Set a maximum upload speed for the request body, in bytes per second.
480    pub fn max_upload_speed(self, max: u64) -> Self {
481        Self {
482            builder: self.builder.max_upload_speed(max),
483            limits: self.limits,
484        }
485    }
486
487    /// Set a maximum download speed for the response body, in bytes per second.
488    pub fn max_download_speed(self, max: u64) -> Self {
489        Self {
490            builder: self.builder.max_download_speed(max),
491            limits: self.limits,
492        }
493    }
494
495    /// Set the maximum response content length allowed.
496    ///
497    /// If the `Content-Length` is present on the response and it exceeds this limit an error is
498    /// returned immediately, otherwise if [`require_length`] is not enabled an error will be returned
499    /// only when the downloaded body length exceeds the limit.
500    ///
501    /// No limit by default.
502    ///
503    /// [`require_length`]: Self::require_length
504    pub fn max_length(mut self, max: ByteLength) -> Self {
505        self.limits.max_length = Some(max);
506        self
507    }
508
509    /// Set if the `Content-Length` header must be present in the response.
510    pub fn require_length(mut self, require: bool) -> Self {
511        self.limits.require_length = require;
512        self
513    }
514
515    /// Enable or disable metrics collecting.
516    ///
517    /// When enabled you can get the information using the [`Response::metrics`] method.
518    ///
519    /// This is enabled by default.
520    pub fn metrics(self, enable: bool) -> Self {
521        Self {
522            builder: self.builder.metrics(enable),
523            limits: self.limits,
524        }
525    }
526
527    /// Build the request without a body.
528    pub fn build(self) -> Request {
529        self.body(()).unwrap()
530    }
531
532    /// Build the request with a body.
533    pub fn body(self, body: impl TryBody) -> Result<Request, Error> {
534        Ok(Request {
535            req: self.builder.body(body.try_body()?).unwrap(),
536            limits: self.limits,
537        })
538    }
539
540    /// Build the request with more custom build calls in the [inner builder].
541    ///
542    /// [inner builder]: isahc::http::request::Builder
543    pub fn build_custom<F>(self, custom: F) -> Result<Request, Error>
544    where
545        F: FnOnce(isahc::http::request::Builder) -> isahc::http::Result<isahc::Request<isahc::AsyncBody>>,
546    {
547        let req = custom(self.builder)?;
548        Ok(Request {
549            req: req.map(Body),
550            limits: self.limits,
551        })
552    }
553}
554
555/// Head parts from a split [`Response`].
556pub type ResponseParts = isahc::http::response::Parts;
557
558/// HTTP response.
559#[derive(Debug)]
560pub struct Response(isahc::Response<isahc::AsyncBody>);
561impl Response {
562    /// Returns the [`StatusCode`].
563    pub fn status(&self) -> StatusCode {
564        self.0.status()
565    }
566
567    /// Returns a reference to the associated header field map.
568    pub fn headers(&self) -> &header::HeaderMap<header::HeaderValue> {
569        self.0.headers()
570    }
571
572    /// Decode content-length value if it is present in the headers.
573    pub fn content_len(&self) -> Option<ByteLength> {
574        self.0.body().len().map(|l| ByteLength(l as usize))
575    }
576
577    /// Get the configured cookie jar used for persisting cookies from this response, if any.
578    ///
579    /// Only returns `None` if the [`default_client`] was replaced by one with cookies disabled.
580    pub fn cookie_jar(&self) -> Option<&CookieJar> {
581        self.0.cookie_jar()
582    }
583
584    /// Read the response body as a string.
585    pub async fn text(&mut self) -> std::io::Result<Txt> {
586        self.0.text().await.map(Txt::from)
587    }
588
589    /// Get the effective URI of this response. This value differs from the
590    /// original URI provided when making the request if at least one redirect
591    /// was followed.
592    pub fn effective_uri(&self) -> Option<&Uri> {
593        self.0.effective_uri()
594    }
595
596    /// Read the response body as raw bytes.
597    pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
598        Body::bytes_impl(self.0.body_mut()).await
599    }
600
601    /// Read some bytes from the body, returns how many bytes where read.
602    pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
603        BufReader::new(self.0.body_mut()).read(buf).await
604    }
605
606    /// Read the from the body to exactly fill the buffer.
607    pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
608        BufReader::new(self.0.body_mut()).read_exact(buf).await
609    }
610
611    /// Deserialize the response body as JSON.
612    pub async fn json<O>(&mut self) -> Result<O, serde_json::Error>
613    where
614        O: serde::de::DeserializeOwned + std::marker::Unpin,
615    {
616        self.0.json().await
617    }
618
619    /// Metrics for the task transfer.
620    ///
621    /// Metrics are enabled in the [`default_client`] and can be toggled for each request using the
622    /// [`RequestBuilder::metrics`] method. If disabled returns [`Metrics::zero`].
623    pub fn metrics(&self) -> Metrics {
624        self.0.metrics().map(Metrics::from_isahc).unwrap_or_else(Metrics::zero)
625    }
626
627    /// Drop the request without dropping the connection.
628    ///
629    /// This receives and discards any remaining bytes in the response stream. When a response
630    /// is dropped without finishing the connection is discarded so it cannot be reused for connections
631    /// older then HTTP/2.
632    ///
633    /// You should call this method before dropping if you expect the remaining bytes to be consumed quickly and
634    /// don't known that HTTP/2 or newer is being used.
635    pub async fn consume(&mut self) -> std::io::Result<()> {
636        self.0.consume().await
637    }
638
639    /// Create a response with the given status and text body message.
640    pub fn new_message(status: impl Into<StatusCode>, msg: impl Into<String>) -> Self {
641        let status = status.into();
642        let msg = msg.into().into_bytes();
643        let msg = futures_lite::io::Cursor::new(msg);
644        let mut r = isahc::Response::new(isahc::AsyncBody::from_reader(msg));
645        *r.status_mut() = status;
646        Self(r)
647    }
648
649    /// New response.
650    pub fn new(status: StatusCode, headers: header::HeaderMap<header::HeaderValue>, body: Body) -> Self {
651        let mut r = isahc::Response::new(body.0);
652        *r.status_mut() = status;
653        *r.headers_mut() = headers;
654        Self(r)
655    }
656
657    /// Consumes the response returning the head and body parts.
658    pub fn into_parts(self) -> (ResponseParts, Body) {
659        let (p, b) = self.0.into_parts();
660        (p, Body(b))
661    }
662
663    /// New response from given head and body.
664    pub fn from_parts(parts: ResponseParts, body: Body) -> Self {
665        Self(isahc::Response::from_parts(parts, body.0))
666    }
667}
668impl From<Response> for isahc::Response<isahc::AsyncBody> {
669    fn from(r: Response) -> Self {
670        r.0
671    }
672}
673
674/// HTTP request body.
675///
676/// Use [`TryBody`] to convert types to body.
677#[derive(Debug, Default)]
678pub struct Body(isahc::AsyncBody);
679impl Body {
680    /// Create a new empty body.
681    ///
682    /// An empty body represents the *absence* of a body, which is semantically different than the presence of a body of zero length.
683    pub fn empty() -> Body {
684        Body(isahc::AsyncBody::empty())
685    }
686
687    /// Create a new body from a potentially static byte buffer.
688    ///
689    /// The body will have a known length equal to the number of bytes given.
690    ///
691    /// This will try to prevent a copy if the type passed in can be re-used, otherwise the buffer
692    /// will be copied first. This method guarantees to not require a copy for the following types:
693    pub fn from_bytes_static(bytes: impl AsRef<[u8]> + 'static) -> Self {
694        Body(isahc::AsyncBody::from_bytes_static(bytes))
695    }
696
697    /// Create a streaming body of unknown length.
698    pub fn from_reader(read: impl AsyncRead + Send + Sync + 'static) -> Self {
699        Body(isahc::AsyncBody::from_reader(read))
700    }
701
702    /// Create a streaming body of with known length.
703    pub fn from_reader_sized(read: impl AsyncRead + Send + Sync + 'static, size: u64) -> Self {
704        Body(isahc::AsyncBody::from_reader_sized(read, size))
705    }
706
707    /// Report if this body is empty.
708    ///
709    /// This is not necessarily the same as checking for zero length, since HTTP message bodies are optional,
710    /// there is a semantic difference between the absence of a body and the presence of a zero-length body.
711    /// This method will only return `true` for the former.
712    pub fn is_empty(&self) -> bool {
713        self.0.is_empty()
714    }
715
716    /// Get the size of the body, if known.
717    pub fn len(&self) -> Option<u64> {
718        self.0.len()
719    }
720
721    /// If this body is repeatable, reset the body stream back to the start of the content.
722    ///
723    /// Returns false if the body cannot be reset.
724    pub fn reset(&mut self) -> bool {
725        self.0.reset()
726    }
727
728    /// Read the body as raw bytes.
729    pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
730        Self::bytes_impl(&mut self.0).await
731    }
732    async fn bytes_impl(body: &mut isahc::AsyncBody) -> std::io::Result<Vec<u8>> {
733        let cap = body.len().unwrap_or(1024);
734        let mut bytes = Vec::with_capacity(cap as usize);
735        super::io::copy(body, &mut bytes).await?;
736        Ok(bytes)
737    }
738
739    /// Read the body and try to convert to UTF-8.
740    ///
741    /// Consider using [`Response::text`], it uses the header encoding information if available.
742    pub async fn text_utf8(&mut self) -> Result<Txt, Box<dyn std::error::Error>> {
743        let bytes = self.bytes().await?;
744        let r = String::from_utf8(bytes)?;
745        Ok(Txt::from(r))
746    }
747
748    /// Read some bytes from the body, returns how many bytes where read.
749    pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
750        BufReader::new(&mut self.0).read(buf).await
751    }
752
753    /// Read the from the body to exactly fill the buffer.
754    pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
755        BufReader::new(&mut self.0).read_exact(buf).await
756    }
757}
758impl From<Body> for isahc::AsyncBody {
759    fn from(r: Body) -> Self {
760        r.0
761    }
762}
763impl From<isahc::AsyncBody> for Body {
764    fn from(r: isahc::AsyncBody) -> Self {
765        Body(r)
766    }
767}
768impl From<()> for Body {
769    fn from(body: ()) -> Self {
770        Body(body.into())
771    }
772}
773impl From<String> for Body {
774    fn from(body: String) -> Self {
775        Body(body.into())
776    }
777}
778impl From<Txt> for Body {
779    fn from(body: Txt) -> Self {
780        Body(String::from(body).into())
781    }
782}
783impl From<Vec<u8>> for Body {
784    fn from(body: Vec<u8>) -> Self {
785        Body(body.into())
786    }
787}
788impl From<&'_ [u8]> for Body {
789    fn from(body: &[u8]) -> Self {
790        body.to_vec().into()
791    }
792}
793impl From<&'_ str> for Body {
794    fn from(body: &str) -> Self {
795        body.as_bytes().into()
796    }
797}
798impl<T: Into<Self>> From<Option<T>> for Body {
799    fn from(body: Option<T>) -> Self {
800        match body {
801            Some(body) => body.into(),
802            None => Self::empty(),
803        }
804    }
805}
806impl AsyncRead for Body {
807    fn poll_read(
808        self: std::pin::Pin<&mut Self>,
809        cx: &mut std::task::Context<'_>,
810        buf: &mut [u8],
811    ) -> std::task::Poll<std::io::Result<usize>> {
812        Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
813    }
814}
815
816/// Send a GET request to the `uri`.
817///
818/// The [`default_client`] is used to send the request.
819pub async fn get(uri: impl TryUri) -> Result<Response, Error> {
820    default_client().get(uri).await
821}
822
823/// Send a GET request to the `uri` and read the response as a string.
824///
825/// The [`default_client`] is used to send the request.
826pub async fn get_txt(uri: impl TryUri) -> Result<Txt, Error> {
827    default_client().get_txt(uri).await
828}
829
830/// Send a GET request to the `uri` and read the response as raw bytes.
831///
832/// The [`default_client`] is used to send the request.
833pub async fn get_bytes(uri: impl TryUri) -> Result<Vec<u8>, Error> {
834    default_client().get_bytes(uri).await
835}
836
837/// Send a GET request to the `uri` and de-serializes the response.
838///
839/// The [`default_client`] is used to send the request.
840pub async fn get_json<O>(uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
841where
842    O: serde::de::DeserializeOwned + std::marker::Unpin,
843{
844    default_client().get_json(uri).await
845}
846
847/// Send a HEAD request to the `uri`.
848///
849/// The [`default_client`] is used to send the request.
850pub async fn head(uri: impl TryUri) -> Result<Response, Error> {
851    default_client().head(uri).await
852}
853
854/// Send a PUT request to the `uri` with a given request body.
855///
856/// The [`default_client`] is used to send the request.
857pub async fn put(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
858    default_client().put(uri, body).await
859}
860
861/// Send a POST request to the `uri` with a given request body.
862///
863/// The [`default_client`] is used to send the request.
864pub async fn post(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
865    default_client().post(uri, body).await
866}
867
868/// Send a DELETE request to the `uri`.
869///
870/// The [`default_client`] is used to send the request.
871pub async fn delete(uri: impl TryUri) -> Result<Response, Error> {
872    default_client().delete(uri).await
873}
874
875/// Send a custom [`Request`].
876///
877/// The [`default_client`] is used to send the request.
878pub async fn send(request: Request) -> Result<Response, Error> {
879    default_client().send(request).await
880}
881
882/// The [`Client`] used by the functions in this module.
883///
884/// You can replace the default client at the start of the process using [`set_default_client_init`].
885///
886/// # Defaults
887///
888/// The default client is created using [`Client::new`].
889///
890/// [`isahc`]: https://docs.rs/isahc
891pub fn default_client() -> &'static Client {
892    use once_cell::sync::Lazy;
893
894    static SHARED: Lazy<Client> = Lazy::new(|| {
895        let ci = mem::replace(&mut *CLIENT_INIT.lock(), ClientInit::Inited);
896        if let ClientInit::Set(init) = ci {
897            init()
898        } else {
899            // browser defaults
900            Client::new()
901        }
902    });
903    &SHARED
904}
905
906static CLIENT_INIT: Mutex<ClientInit> = const_mutex(ClientInit::None);
907
908enum ClientInit {
909    None,
910    Set(Box<dyn FnOnce() -> Client + Send>),
911    Inited,
912}
913
914/// Set a custom initialization function for the [`default_client`].
915///
916/// The [`default_client`] is used by all functions in this module and is initialized on the first usage,
917/// you can use this function before any HTTP operation to replace the [`isahc`] client.
918///
919/// Returns an error if the [`default_client`] was already initialized.
920///
921/// [`isahc`]: https://docs.rs/isahc
922pub fn set_default_client_init<I>(init: I) -> Result<(), DefaultAlreadyInitedError>
923where
924    I: FnOnce() -> Client + Send + 'static,
925{
926    let mut ci = CLIENT_INIT.lock();
927    if let ClientInit::Inited = &*ci {
928        Err(DefaultAlreadyInitedError {})
929    } else {
930        *ci = ClientInit::Set(Box::new(init));
931        Ok(())
932    }
933}
934
935/// Error returned by [`set_default_client_init`] if the default was already initialized.
936#[derive(Debug, Clone, Copy)]
937#[non_exhaustive]
938pub struct DefaultAlreadyInitedError {}
939impl fmt::Display for DefaultAlreadyInitedError {
940    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
941        write!(f, "default client already initialized, can only set before first use")
942    }
943}
944impl std::error::Error for DefaultAlreadyInitedError {}
945
946/// Information about the state of an HTTP request.
947#[derive(Debug, Clone, PartialEq, Eq)]
948#[non_exhaustive]
949pub struct Metrics {
950    /// Number of bytes uploaded / estimated total.
951    pub upload_progress: (ByteLength, ByteLength),
952
953    /// Average upload speed so far in bytes/second.
954    pub upload_speed: ByteLength,
955
956    /// Number of bytes downloaded / estimated total.
957    pub download_progress: (ByteLength, ByteLength),
958
959    /// Average download speed so far in bytes/second.
960    pub download_speed: ByteLength,
961
962    /// Total time from the start of the request until DNS name resolving was completed.
963    ///
964    /// When a redirect is followed, the time from each request is added together.
965    pub name_lookup_time: Duration,
966
967    /// Amount of time taken to establish a connection to the server (not including TLS connection time).
968    ///
969    /// When a redirect is followed, the time from each request is added together.
970    pub connect_time: Duration,
971
972    /// Amount of time spent on TLS handshakes.
973    ///
974    /// When a redirect is followed, the time from each request is added together.
975    pub secure_connect_time: Duration,
976
977    /// Time it took from the start of the request until the first byte is either sent or received.
978    ///
979    /// When a redirect is followed, the time from each request is added together.
980    pub transfer_start_time: Duration,
981
982    /// Amount of time spent performing the actual request transfer. The “transfer” includes
983    /// both sending the request and receiving the response.
984    ///
985    /// When a redirect is followed, the time from each request is added together.
986    pub transfer_time: Duration,
987
988    /// Total time for the entire request. This will continuously increase until the entire
989    /// response body is consumed and completed.
990    ///
991    /// When a redirect is followed, the time from each request is added together.
992    pub total_time: Duration,
993
994    /// If automatic redirect following is enabled, the total time taken for all redirection steps
995    /// including name lookup, connect, pre-transfer and transfer before final transaction was started.
996    pub redirect_time: Duration,
997}
998impl Metrics {
999    /// Init from `isahc::Metrics`.
1000    pub fn from_isahc(m: &isahc::Metrics) -> Self {
1001        Self {
1002            upload_progress: {
1003                let (c, t) = m.upload_progress();
1004                ((c as usize).bytes(), (t as usize).bytes())
1005            },
1006            upload_speed: (m.upload_speed().round() as usize).bytes(),
1007            download_progress: {
1008                let (c, t) = m.download_progress();
1009                ((c as usize).bytes(), (t as usize).bytes())
1010            },
1011            download_speed: (m.download_speed().round() as usize).bytes(),
1012            name_lookup_time: m.name_lookup_time(),
1013            connect_time: m.connect_time(),
1014            secure_connect_time: m.secure_connect_time(),
1015            transfer_start_time: m.transfer_start_time(),
1016            transfer_time: m.transfer_time(),
1017            total_time: m.total_time(),
1018            redirect_time: m.redirect_time(),
1019        }
1020    }
1021
1022    /// All zeros.
1023    pub fn zero() -> Self {
1024        Self {
1025            upload_progress: (0.bytes(), 0.bytes()),
1026            upload_speed: 0.bytes(),
1027            download_progress: (0.bytes(), 0.bytes()),
1028            download_speed: 0.bytes(),
1029            name_lookup_time: Duration::ZERO,
1030            connect_time: Duration::ZERO,
1031            secure_connect_time: Duration::ZERO,
1032            transfer_start_time: Duration::ZERO,
1033            transfer_time: Duration::ZERO,
1034            total_time: Duration::ZERO,
1035            redirect_time: Duration::ZERO,
1036        }
1037    }
1038}
1039impl From<isahc::Metrics> for Metrics {
1040    fn from(m: isahc::Metrics) -> Self {
1041        Metrics::from_isahc(&m)
1042    }
1043}
1044impl fmt::Display for Metrics {
1045    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1046        let mut ws = false; // written something
1047
1048        if self.upload_progress.0 != self.upload_progress.1 {
1049            write!(
1050                f,
1051                "↑ {} - {}, {}/s",
1052                self.upload_progress.0, self.upload_progress.1, self.upload_speed
1053            )?;
1054            ws = true;
1055        }
1056        if self.download_progress.0 != self.download_progress.1 {
1057            write!(
1058                f,
1059                "{}↓ {} - {}, {}/s",
1060                if ws { "\n" } else { "" },
1061                self.download_progress.0,
1062                self.download_progress.1,
1063                self.download_speed
1064            )?;
1065            ws = true;
1066        }
1067
1068        if !ws {
1069            if self.upload_progress.1.bytes() > 0 {
1070                write!(f, "↑ {}", self.upload_progress.1)?;
1071                ws = true;
1072            }
1073            if self.download_progress.1.bytes() > 0 {
1074                write!(f, "{}↓ {}", if ws { "\n" } else { "" }, self.download_progress.1)?;
1075                ws = true;
1076            }
1077
1078            if ws {
1079                write!(f, "\n{:?}", self.total_time)?;
1080            }
1081        }
1082
1083        Ok(())
1084    }
1085}
1086impl_from_and_into_var! {
1087    fn from(metrics: Metrics) -> Progress {
1088        let mut status = Progress::indeterminate();
1089        if metrics.download_progress.1 > 0.bytes() {
1090            status = Progress::from_n_of(metrics.download_progress.0 .0, metrics.download_progress.1 .0);
1091        }
1092        if metrics.upload_progress.1 > 0.bytes() {
1093            let u_status = Progress::from_n_of(metrics.upload_progress.0 .0, metrics.upload_progress.1 .0);
1094            if status.is_indeterminate() {
1095                status = u_status;
1096            } else {
1097                status = status.and_fct(u_status.fct());
1098            }
1099        }
1100        status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
1101            m.set(*METRICS_ID, metrics);
1102        })
1103    }
1104}
1105zng_state_map::static_id! {
1106    /// Metrics in a [`Progress::with_meta`] metadata.
1107    pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
1108}
1109
1110/// HTTP client.
1111///
1112/// An HTTP client acts as a session for executing one of more HTTP requests.
1113pub struct Client {
1114    client: isahc::HttpClient,
1115    cache: Option<Box<dyn CacheDb>>,
1116    cache_mode: Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>,
1117}
1118impl Default for Client {
1119    fn default() -> Self {
1120        Self::new()
1121    }
1122}
1123impl Clone for Client {
1124    fn clone(&self) -> Self {
1125        Client {
1126            client: self.client.clone(),
1127            cache: self.cache.as_ref().map(|b| b.clone_boxed()),
1128            cache_mode: self.cache_mode.clone(),
1129        }
1130    }
1131}
1132impl fmt::Debug for Client {
1133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1134        f.debug_struct("Client").finish_non_exhaustive()
1135    }
1136}
1137impl Client {
1138    /// New client with default config.
1139    ///
1140    /// This enables cookies, sets the `redirect_policy` with a limit of up-to 20 redirects and `auto_referer`, sets
1141    /// the `connect_timeout` to 90 seconds and enables `metrics`.
1142    pub fn new() -> Self {
1143        Client::builder()
1144            .cookies()
1145            .redirect_policy(RedirectPolicy::Limit(20))
1146            .connect_timeout(90.secs())
1147            .metrics(true)
1148            .build()
1149    }
1150
1151    /// Start a new [`ClientBuilder`] for creating a custom client.
1152    pub fn builder() -> ClientBuilder {
1153        ClientBuilder {
1154            builder: isahc::HttpClient::builder(),
1155            cache: None,
1156            cache_mode: None,
1157        }
1158    }
1159
1160    /// Gets the configured cookie-jar for this client, if cookies are enabled.
1161    pub fn cookie_jar(&self) -> Option<&CookieJar> {
1162        self.client.cookie_jar()
1163    }
1164
1165    /// Send a GET request to the `uri`.
1166    pub async fn get(&self, uri: impl TryUri) -> Result<Response, Error> {
1167        self.send(Request::get(uri)?.build()).await
1168    }
1169
1170    /// Send a GET request to the `uri` and read the response as a string.
1171    pub async fn get_txt(&self, uri: impl TryUri) -> Result<Txt, Error> {
1172        let mut r = self.get(uri).await?;
1173        let r = r.text().await?;
1174        Ok(r)
1175    }
1176
1177    /// Send a GET request to the `uri` and read the response as raw bytes.
1178    pub async fn get_bytes(&self, uri: impl TryUri) -> Result<Vec<u8>, Error> {
1179        let mut r = self.get(uri).await?;
1180        let r = r.bytes().await?;
1181        Ok(r)
1182    }
1183
1184    /// Send a GET request to the `uri` and de-serializes the response.
1185    pub async fn get_json<O>(&self, uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
1186    where
1187        O: serde::de::DeserializeOwned + std::marker::Unpin,
1188    {
1189        let mut r = self.get(uri).await?;
1190        let r = r.json::<O>().await?;
1191        Ok(r)
1192    }
1193
1194    /// Send a HEAD request to the `uri`.
1195    pub async fn head(&self, uri: impl TryUri) -> Result<Response, Error> {
1196        self.send(Request::head(uri)?.build()).await
1197    }
1198    /// Send a PUT request to the `uri` with a given request body.
1199    pub async fn put(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1200        self.send(Request::put(uri)?.body(body)?).await
1201    }
1202
1203    /// Send a POST request to the `uri` with a given request body.
1204    pub async fn post(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1205        self.send(Request::post(uri)?.body(body)?).await
1206    }
1207
1208    /// Send a DELETE request to the `uri`.
1209    pub async fn delete(&self, uri: impl TryUri) -> Result<Response, Error> {
1210        self.send(Request::delete(uri)?.build()).await
1211    }
1212
1213    /// Send a custom [`Request`].
1214    ///
1215    /// # Cache
1216    ///
1217    /// If the client has a [`cache`] and the request uses the `GET` method the result will be cached
1218    /// according with the [`cache_mode`] selected for the request.
1219    ///
1220    /// [`cache`]: Self::cache
1221    /// [`cache_mode`]: Self::cache_mode
1222    pub async fn send(&self, request: Request) -> Result<Response, Error> {
1223        if let Some(db) = &self.cache {
1224            match self.cache_mode(&request) {
1225                CacheMode::NoCache => {
1226                    let response = self.client.send_async(request.req).await?;
1227                    let response = request.limits.check(response)?;
1228                    Ok(Response(response))
1229                }
1230                CacheMode::Default => self.send_cache_default(&**db, request, 0).await,
1231                CacheMode::Permanent => self.send_cache_permanent(&**db, request, 0).await,
1232                CacheMode::Error(e) => Err(e),
1233            }
1234        } else {
1235            let response = self.client.send_async(request.req).await?;
1236            let response = request.limits.check(response)?;
1237            Ok(Response(response))
1238        }
1239    }
1240
1241    #[async_recursion::async_recursion]
1242    async fn send_cache_default(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1243        if retry_count == 3 {
1244            tracing::error!("retried cache 3 times, skipping cache");
1245            let response = self.client.send_async(request.req).await?;
1246            let response = request.limits.check(response)?;
1247            return Ok(Response(response));
1248        }
1249
1250        let key = CacheKey::new(&request.req);
1251        if let Some(policy) = db.policy(&key).await {
1252            match policy.before_request(&request.req) {
1253                BeforeRequest::Fresh(parts) => {
1254                    if let Some(body) = db.body(&key).await {
1255                        let response = isahc::Response::from_parts(parts, body.0);
1256                        let response = request.limits.check(response)?;
1257
1258                        Ok(Response(response))
1259                    } else {
1260                        tracing::error!("cache returned policy but not body");
1261                        db.remove(&key).await;
1262                        self.send_cache_default(db, request, retry_count + 1).await
1263                    }
1264                }
1265                BeforeRequest::Stale { request: parts, matches } => {
1266                    if matches {
1267                        let (_, body) = request.req.into_parts();
1268                        let request = Request {
1269                            req: isahc::Request::from_parts(parts, body),
1270                            limits: request.limits,
1271                        };
1272                        let policy_request = request.clone_with(()).unwrap().req;
1273                        let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1274
1275                        let response = self.client.send_async(request.req).await?;
1276                        let response = request.limits.check(response)?;
1277
1278                        match policy.after_response(&policy_request, &response) {
1279                            AfterResponse::NotModified(policy, parts) => {
1280                                if let Some(body) = db.body(&key).await {
1281                                    let response = isahc::Response::from_parts(parts, body.0);
1282
1283                                    db.set_policy(&key, policy).await;
1284
1285                                    Ok(Response(response))
1286                                } else {
1287                                    tracing::error!("cache returned policy but not body");
1288                                    db.remove(&key).await;
1289
1290                                    if no_req_body {
1291                                        self.send_cache_default(
1292                                            db,
1293                                            Request {
1294                                                req: policy_request,
1295                                                limits: request.limits,
1296                                            },
1297                                            retry_count + 1,
1298                                        )
1299                                        .await
1300                                    } else {
1301                                        Err(std::io::Error::new(
1302                                            std::io::ErrorKind::NotFound,
1303                                            "cache returned policy but not body, cannot auto-retry",
1304                                        )
1305                                        .into())
1306                                    }
1307                                }
1308                            }
1309                            AfterResponse::Modified(policy, parts) => {
1310                                if policy.should_store() {
1311                                    let (_, body) = response.into_parts();
1312                                    if let Some(body) = db.set(&key, policy, Body(body)).await {
1313                                        let response = isahc::Response::from_parts(parts, body.0);
1314
1315                                        Ok(Response(response))
1316                                    } else {
1317                                        tracing::error!("cache db failed to store body");
1318                                        db.remove(&key).await;
1319
1320                                        if no_req_body {
1321                                            self.send_cache_default(
1322                                                db,
1323                                                Request {
1324                                                    req: policy_request,
1325                                                    limits: request.limits,
1326                                                },
1327                                                retry_count + 1,
1328                                            )
1329                                            .await
1330                                        } else {
1331                                            Err(std::io::Error::new(
1332                                                std::io::ErrorKind::NotFound,
1333                                                "cache db failed to store body, cannot auto-retry",
1334                                            )
1335                                            .into())
1336                                        }
1337                                    }
1338                                } else {
1339                                    db.remove(&key).await;
1340
1341                                    Ok(Response(response))
1342                                }
1343                            }
1344                        }
1345                    } else {
1346                        tracing::error!("cache policy did not match request, {request:?}");
1347                        db.remove(&key).await;
1348                        let response = self.client.send_async(request.req).await?;
1349                        let response = request.limits.check(response)?;
1350                        Ok(Response(response))
1351                    }
1352                }
1353            }
1354        } else {
1355            let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1356            let policy_request = request.clone_with(()).unwrap().req;
1357
1358            let response = self.client.send_async(request.req).await?;
1359            let response = request.limits.check(response)?;
1360
1361            let policy = CachePolicy::new(&policy_request, &response);
1362
1363            if policy.should_store() {
1364                let (parts, body) = response.into_parts();
1365
1366                if let Some(body) = db.set(&key, policy, Body(body)).await {
1367                    let response = isahc::Response::from_parts(parts, body.0);
1368
1369                    Ok(Response(response))
1370                } else {
1371                    tracing::error!("cache db failed to store body");
1372                    db.remove(&key).await;
1373
1374                    if no_req_body {
1375                        self.send_cache_default(
1376                            db,
1377                            Request {
1378                                req: policy_request,
1379                                limits: request.limits,
1380                            },
1381                            retry_count + 1,
1382                        )
1383                        .await
1384                    } else {
1385                        Err(std::io::Error::new(std::io::ErrorKind::NotFound, "cache db failed to store body, cannot auto-retry").into())
1386                    }
1387                }
1388            } else {
1389                Ok(Response(response))
1390            }
1391        }
1392    }
1393
1394    #[async_recursion::async_recursion]
1395    async fn send_cache_permanent(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1396        if retry_count == 3 {
1397            tracing::error!("retried cache 3 times, skipping cache");
1398            let response = self.client.send_async(request.req).await?;
1399            let response = request.limits.check(response)?;
1400            return Ok(Response(response));
1401        }
1402
1403        let key = CacheKey::new(&request.req);
1404        if let Some(policy) = db.policy(&key).await {
1405            if let Some(body) = db.body(&key).await {
1406                match policy.before_request(&request.req) {
1407                    BeforeRequest::Fresh(p) => {
1408                        let response = isahc::Response::from_parts(p, body.0);
1409                        let response = request.limits.check(response)?;
1410
1411                        if !policy.is_permanent() {
1412                            db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1413                        }
1414
1415                        Ok(Response(response))
1416                    }
1417                    BeforeRequest::Stale { request: parts, .. } => {
1418                        // policy was not permanent when cached
1419
1420                        let limits = request.limits.clone();
1421
1422                        let (_, req_body) = request.req.into_parts();
1423                        let request = isahc::Request::from_parts(parts, req_body);
1424
1425                        let response = self.client.send_async(request).await?;
1426                        let response = limits.check(response)?;
1427
1428                        let (parts, _) = response.into_parts();
1429
1430                        let response = isahc::Response::from_parts(parts, body.0);
1431
1432                        db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1433
1434                        Ok(Response(response))
1435                    }
1436                }
1437            } else {
1438                tracing::error!("cache returned policy but not body");
1439                db.remove(&key).await;
1440                self.send_cache_permanent(db, request, retry_count + 1).await
1441            }
1442        } else {
1443            let backup_request = if request.req.body().len().map(|l| l == 0).unwrap_or(false) {
1444                Some(request.clone_with(()).unwrap())
1445            } else {
1446                None
1447            };
1448
1449            let response = self.client.send_async(request.req).await?;
1450            let response = request.limits.check(response)?;
1451            let policy = CachePolicy::new_permanent(&response);
1452
1453            let (parts, body) = response.into_parts();
1454
1455            if let Some(body) = db.set(&key, policy, Body(body)).await {
1456                let response = isahc::Response::from_parts(parts, body.0);
1457                Ok(Response(response))
1458            } else {
1459                tracing::error!("cache db failed to store body");
1460                db.remove(&key).await;
1461
1462                if let Some(request) = backup_request {
1463                    self.send_cache_permanent(db, request, retry_count + 1).await
1464                } else {
1465                    Err(std::io::Error::new(
1466                        std::io::ErrorKind::NotFound,
1467                        "cache db failed to store permanent body, cannot auto-retry",
1468                    )
1469                    .into())
1470                }
1471            }
1472        }
1473    }
1474
1475    /// Reference the cache used in this client.
1476    pub fn cache(&self) -> Option<&dyn CacheDb> {
1477        self.cache.as_deref()
1478    }
1479
1480    /// Returns the [`CacheMode`] that is used in this client if the request is made.
1481    pub fn cache_mode(&self, request: &Request) -> CacheMode {
1482        if self.cache.is_none() || request.method() != Method::GET {
1483            CacheMode::NoCache
1484        } else {
1485            (self.cache_mode)(request)
1486        }
1487    }
1488}
1489impl From<Client> for isahc::HttpClient {
1490    fn from(c: Client) -> Self {
1491        c.client
1492    }
1493}
1494impl From<isahc::HttpClient> for Client {
1495    fn from(client: isahc::HttpClient) -> Self {
1496        Self {
1497            client,
1498            cache: None,
1499            cache_mode: Arc::new(|_| CacheMode::default()),
1500        }
1501    }
1502}
1503
1504/// Builder that can be used to create a [`Client`].
1505///
1506/// Use [`Client::builder`] to start building.
1507///
1508/// # Examples
1509///
1510/// ```
1511/// use zng_task::http::*;
1512///
1513/// let client = Client::builder().metrics(true).build();
1514/// ```
1515pub struct ClientBuilder {
1516    builder: isahc::HttpClientBuilder,
1517    cache: Option<Box<dyn CacheDb>>,
1518    cache_mode: Option<Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>>,
1519}
1520impl Default for ClientBuilder {
1521    fn default() -> Self {
1522        Client::builder()
1523    }
1524}
1525impl ClientBuilder {
1526    /// New default builder.
1527    pub fn new() -> Self {
1528        Client::builder()
1529    }
1530
1531    /// Build the [`Client`] using the configured options.
1532    pub fn build(self) -> Client {
1533        Client {
1534            client: self.builder.build().unwrap(),
1535            cache: self.cache,
1536            cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1537        }
1538    }
1539
1540    /// Build the client with more custom build calls in the [inner builder].
1541    ///
1542    /// [inner builder]: isahc::HttpClientBuilder
1543    pub fn build_custom<F>(self, custom: F) -> Result<Client, Error>
1544    where
1545        F: FnOnce(isahc::HttpClientBuilder) -> Result<isahc::HttpClient, Error>,
1546    {
1547        custom(self.builder).map(|c| Client {
1548            client: c,
1549            cache: self.cache,
1550            cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1551        })
1552    }
1553
1554    /// Add a default header to be passed with every request.
1555    pub fn default_header(self, key: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
1556        Ok(Self {
1557            builder: self.builder.default_header(key.try_header_name()?, value.try_header_value()?),
1558            cache: self.cache,
1559            cache_mode: self.cache_mode,
1560        })
1561    }
1562
1563    /// Enable persistent cookie handling for all requests using this client using a shared cookie jar.
1564    pub fn cookies(self) -> Self {
1565        Self {
1566            builder: self.builder.cookies(),
1567            cache: self.cache,
1568            cache_mode: self.cache_mode,
1569        }
1570    }
1571
1572    /// Set a cookie jar to use to accept, store, and supply cookies for incoming responses and outgoing requests.
1573    ///
1574    /// Note that the [`default_client`] already has a cookie jar.
1575    pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
1576        Self {
1577            builder: self.builder.cookie_jar(cookie_jar),
1578            cache: self.cache,
1579            cache_mode: self.cache_mode,
1580        }
1581    }
1582
1583    /// Specify a maximum amount of time that a complete request/response cycle is allowed to
1584    /// take before being aborted. This includes DNS resolution, connecting to the server,
1585    /// writing the request, and reading the response.
1586    ///
1587    /// Note that this includes the response read operation, so if you get a response but don't
1588    /// read-it within this timeout you will get a [`TimedOut`] IO error.
1589    ///
1590    /// By default no timeout is used.
1591    ///
1592    /// [`TimedOut`]: https://doc.rust-lang.org/nightly/std/io/enum.ErrorKind.html#variant.TimedOut
1593    pub fn timeout(self, timeout: Duration) -> Self {
1594        Self {
1595            builder: self.builder.timeout(timeout),
1596            cache: self.cache,
1597            cache_mode: self.cache_mode,
1598        }
1599    }
1600
1601    /// Set a timeout for establishing connections to a host.
1602    ///
1603    /// If not set, the [`default_client`] default of 90 seconds will be used.
1604    pub fn connect_timeout(self, timeout: Duration) -> Self {
1605        Self {
1606            builder: self.builder.connect_timeout(timeout),
1607            cache: self.cache,
1608            cache_mode: self.cache_mode,
1609        }
1610    }
1611
1612    /// Specify a maximum amount of time where transfer rate can go below a minimum speed limit.
1613    ///
1614    /// The `low_speed` limit is in bytes/s. No low-speed limit is configured by default.
1615    pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
1616        Self {
1617            builder: self.builder.low_speed_timeout(low_speed, timeout),
1618            cache: self.cache,
1619            cache_mode: self.cache_mode,
1620        }
1621    }
1622
1623    /// Set a policy for automatically following server redirects.
1624    ///
1625    /// If enabled the "Referer" header will be set automatically too.
1626    pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
1627        if !matches!(policy, RedirectPolicy::None) {
1628            Self {
1629                builder: self.builder.redirect_policy(policy).auto_referer(),
1630                cache: self.cache,
1631                cache_mode: self.cache_mode,
1632            }
1633        } else {
1634            Self {
1635                builder: self.builder.redirect_policy(policy),
1636                cache: self.cache,
1637                cache_mode: self.cache_mode,
1638            }
1639        }
1640    }
1641
1642    /// Enable or disable automatic decompression of the response body.
1643    ///
1644    /// If enabled the "Accept-Encoding" will also be set automatically, if it was not set using [`default_header`].
1645    ///
1646    /// This is enabled by default.
1647    ///
1648    /// [`default_header`]: Self::default_header
1649    pub fn auto_decompress(self, enabled: bool) -> Self {
1650        Self {
1651            builder: self.builder.automatic_decompression(enabled),
1652            cache: self.cache,
1653            cache_mode: self.cache_mode,
1654        }
1655    }
1656
1657    /// Set a maximum upload speed for the request body, in bytes per second.
1658    pub fn max_upload_speed(self, max: u64) -> Self {
1659        Self {
1660            builder: self.builder.max_upload_speed(max),
1661            cache: self.cache,
1662            cache_mode: self.cache_mode,
1663        }
1664    }
1665
1666    /// Set a maximum download speed for the response body, in bytes per second.
1667    pub fn max_download_speed(self, max: u64) -> Self {
1668        Self {
1669            builder: self.builder.max_download_speed(max),
1670            cache: self.cache,
1671            cache_mode: self.cache_mode,
1672        }
1673    }
1674
1675    /// Enable or disable metrics collecting.
1676    ///
1677    /// When enabled you can get the information using the [`Response::metrics`] method.
1678    ///
1679    /// This is enabled by default.
1680    pub fn metrics(self, enable: bool) -> Self {
1681        Self {
1682            builder: self.builder.metrics(enable),
1683            cache: self.cache,
1684            cache_mode: self.cache_mode,
1685        }
1686    }
1687
1688    /// Sets the [`CacheDb`] to use.
1689    ///
1690    /// Caching is only enabled if there is a DB, no caching is done by default.
1691    pub fn cache(self, cache: impl CacheDb) -> Self {
1692        Self {
1693            builder: self.builder,
1694            cache: Some(Box::new(cache)),
1695            cache_mode: self.cache_mode,
1696        }
1697    }
1698
1699    /// Sets the [`CacheMode`] selector.
1700    ///
1701    /// The `selector` closure is called for every cacheable request before it is made, it
1702    /// must return a [`CacheMode`] value that configures how the [`cache`] is used.
1703    ///
1704    /// Note that the closure is only called if a [`cache`] is set.
1705    ///
1706    /// [`cache`]: Self::cache
1707    pub fn cache_mode(self, selector: impl Fn(&Request) -> CacheMode + Send + Sync + 'static) -> Self {
1708        Self {
1709            builder: self.builder,
1710            cache: self.cache,
1711            cache_mode: Some(Arc::new(selector)),
1712        }
1713    }
1714}
1715
1716/// An error encountered while sending an HTTP request or receiving an HTTP response using a [`Client`].
1717#[derive(Debug, Clone)]
1718#[non_exhaustive]
1719pub enum Error {
1720    /// Error from the HTTP client.
1721    Client(isahc::Error),
1722    /// Error when [`max_length`] validation fails at the header or after streaming download.
1723    ///
1724    /// [`max_length`]: RequestBuilder::max_length
1725    MaxLength {
1726        /// The `Content-Length` header value, if it was set.
1727        content_length: Option<ByteLength>,
1728        /// The maximum allowed length.
1729        max_length: ByteLength,
1730    },
1731    /// Error when [`require_length`] is set, but a response was sent without the `Content-Length` header.
1732    ///
1733    /// [`require_length`]: RequestBuilder::require_length
1734    RequireLength,
1735}
1736impl StdError for Error {
1737    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1738        match self {
1739            Error::Client(e) => Some(e),
1740            _ => None,
1741        }
1742    }
1743}
1744impl From<isahc::Error> for Error {
1745    fn from(e: isahc::Error) -> Self {
1746        if let Some(e) = e
1747            .source()
1748            .and_then(|e| e.downcast_ref::<std::io::Error>())
1749            .and_then(|e| e.get_ref())
1750        {
1751            if let Some(e) = e.downcast_ref::<MaxLengthError>() {
1752                return Error::MaxLength {
1753                    content_length: e.0,
1754                    max_length: e.1,
1755                };
1756            }
1757            if e.downcast_ref::<RequireLengthError>().is_some() {
1758                return Error::RequireLength;
1759            }
1760        }
1761        Error::Client(e)
1762    }
1763}
1764impl From<isahc::http::Error> for Error {
1765    fn from(e: isahc::http::Error) -> Self {
1766        isahc::Error::from(e).into()
1767    }
1768}
1769impl From<std::io::Error> for Error {
1770    fn from(e: std::io::Error) -> Self {
1771        isahc::Error::from(e).into()
1772    }
1773}
1774impl fmt::Display for Error {
1775    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1776        match self {
1777            Error::Client(e) => write!(f, "{e}"),
1778            Error::MaxLength {
1779                content_length,
1780                max_length,
1781            } => write!(f, "{}", MaxLengthError(*content_length, *max_length)),
1782            Error::RequireLength => write!(f, "{}", RequireLengthError {}),
1783        }
1784    }
1785}
1786
1787// Error types smuggled inside an io::Error inside the isahc::Error.
1788
1789#[derive(Debug)]
1790struct MaxLengthError(Option<ByteLength>, ByteLength);
1791impl fmt::Display for MaxLengthError {
1792    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1793        if let Some(l) = self.0 {
1794            write!(f, "content-length of {l} exceeds limit of {}", self.1)
1795        } else {
1796            write!(f, "download reached limit of {}", self.1)
1797        }
1798    }
1799}
1800impl StdError for MaxLengthError {}
1801
1802#[derive(Debug)]
1803#[non_exhaustive]
1804struct RequireLengthError {}
1805impl fmt::Display for RequireLengthError {
1806    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1807        write!(f, "content-length is required")
1808    }
1809}
1810impl StdError for RequireLengthError {}