1#![cfg(feature = "http")]
2#![expect(clippy::type_complexity)]
4
5mod cache;
25mod util;
26
27pub use cache::*;
28use zng_var::impl_from_and_into_var;
29
30use std::convert::TryFrom;
31use std::error::Error as StdError;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::time::Duration;
35use std::{fmt, mem};
36
37use crate::Progress;
38
39use super::io::AsyncRead;
40
41use isahc::config::Configurable;
42pub use isahc::config::RedirectPolicy;
43pub use isahc::cookies::{Cookie, CookieJar};
44pub use isahc::http::{Method, StatusCode, Uri, header, uri};
45
46use futures_lite::io::{AsyncReadExt, BufReader};
47use isahc::{AsyncReadResponseExt, ResponseExt};
48use parking_lot::{Mutex, const_mutex};
49
50use zng_txt::{Txt, formatx};
51use zng_unit::*;
52
53#[diagnostic::on_unimplemented(note = "`TryUri` is implemented for all `T` where `Uri: TryFrom<T, Error: Into<isahc::http::Error>>`")]
57pub trait TryUri {
58    fn try_uri(self) -> Result<Uri, Error>;
60}
61impl<U> TryUri for U
62where
63    Uri: TryFrom<U>,
64    <Uri as TryFrom<U>>::Error: Into<isahc::http::Error>,
65{
66    fn try_uri(self) -> Result<Uri, Error> {
67        Uri::try_from(self).map_err(|e| e.into().into())
68    }
69}
70
71#[diagnostic::on_unimplemented(note = "`TryMethod` is implemented for all `T` where `Method: TryFrom<T, Error: Into<isahc::http::Error>>`")]
75pub trait TryMethod {
76    fn try_method(self) -> Result<Method, Error>;
78}
79impl<U> TryMethod for U
80where
81    Method: TryFrom<U>,
82    <isahc::http::Method as TryFrom<U>>::Error: Into<isahc::http::Error>,
83{
84    fn try_method(self) -> Result<Method, Error> {
85        Method::try_from(self).map_err(|e| e.into().into())
86    }
87}
88
89#[diagnostic::on_unimplemented(note = "`TryBody` is implemented for all `T` where `Body: TryFrom<T, Error: Into<isahc::http::Error>>`")]
94pub trait TryBody {
95    fn try_body(self) -> Result<Body, Error>;
97}
98impl<U> TryBody for U
99where
100    isahc::AsyncBody: TryFrom<U>,
101    <isahc::AsyncBody as TryFrom<U>>::Error: Into<isahc::http::Error>,
102{
103    fn try_body(self) -> Result<Body, Error> {
104        match isahc::AsyncBody::try_from(self) {
105            Ok(r) => Ok(Body(r)),
106            Err(e) => Err(e.into().into()),
107        }
108    }
109}
110
111#[diagnostic::on_unimplemented(
116    note = "`TryHeaderName` is implemented for all `T` where `HeaderName: TryFrom<T, Error: Into<isahc::http::Error>>`"
117)]
118pub trait TryHeaderName {
119    fn try_header_name(self) -> Result<header::HeaderName, Error>;
121}
122impl<U> TryHeaderName for U
123where
124    header::HeaderName: TryFrom<U>,
125    <header::HeaderName as TryFrom<U>>::Error: Into<isahc::http::Error>,
126{
127    fn try_header_name(self) -> Result<header::HeaderName, Error> {
128        header::HeaderName::try_from(self).map_err(|e| e.into().into())
129    }
130}
131
132#[diagnostic::on_unimplemented(
137    note = "`TryHeaderValue` is implemented for all `T` where `HeaderValue: TryFrom<T, Error: Into<isahc::http::Error>>`"
138)]
139pub trait TryHeaderValue {
140    fn try_header_value(self) -> Result<header::HeaderValue, Error>;
142}
143impl<U> TryHeaderValue for U
144where
145    header::HeaderValue: TryFrom<U>,
146    <header::HeaderValue as TryFrom<U>>::Error: Into<isahc::http::Error>,
147{
148    fn try_header_value(self) -> Result<header::HeaderValue, Error> {
149        header::HeaderValue::try_from(self).map_err(|e| e.into().into())
150    }
151}
152
153#[derive(Debug)]
157pub struct Request {
158    req: isahc::Request<Body>,
159    limits: ResponseLimits,
160}
161impl Request {
162    pub fn builder() -> RequestBuilder {
183        RequestBuilder::start(isahc::Request::builder())
184    }
185
186    pub fn get(uri: impl TryUri) -> Result<RequestBuilder, Error> {
198        Ok(RequestBuilder::start(isahc::Request::get(uri.try_uri()?)))
199    }
200
201    pub fn put(uri: impl TryUri) -> Result<RequestBuilder, Error> {
215        Ok(RequestBuilder::start(isahc::Request::put(uri.try_uri()?)))
216    }
217
218    pub fn post(uri: impl TryUri) -> Result<RequestBuilder, Error> {
232        Ok(RequestBuilder::start(isahc::Request::post(uri.try_uri()?)))
233    }
234
235    pub fn delete(uri: impl TryUri) -> Result<RequestBuilder, Error> {
249        Ok(RequestBuilder::start(isahc::Request::delete(uri.try_uri()?)))
250    }
251
252    pub fn patch(uri: impl TryUri) -> Result<RequestBuilder, Error> {
266        Ok(RequestBuilder::start(isahc::Request::patch(uri.try_uri()?)))
267    }
268
269    pub fn head(uri: impl TryUri) -> Result<RequestBuilder, Error> {
281        Ok(RequestBuilder::start(isahc::Request::head(uri.try_uri()?)))
282    }
283
284    pub fn uri(&self) -> &Uri {
286        self.req.uri()
287    }
288
289    pub fn method(&self) -> &Method {
291        self.req.method()
292    }
293
294    pub fn headers(&self) -> &header::HeaderMap {
296        self.req.headers()
297    }
298
299    pub fn clone_with(&self, body: impl TryBody) -> Result<Self, Error> {
301        let body = body.try_body()?;
302
303        let mut req = isahc::Request::new(body);
304        *req.method_mut() = self.req.method().clone();
305        *req.uri_mut() = self.req.uri().clone();
306        *req.version_mut() = self.req.version();
307        let headers = req.headers_mut();
308        for (name, value) in self.headers() {
309            headers.insert(name.clone(), value.clone());
310        }
311
312        Ok(Self {
313            req,
314            limits: self.limits.clone(),
315        })
316    }
317}
318
319#[derive(Debug, Default, Clone)]
320struct ResponseLimits {
321    max_length: Option<ByteLength>,
322    require_length: bool,
323}
324impl ResponseLimits {
325    fn check(&self, response: isahc::Response<isahc::AsyncBody>) -> Result<isahc::Response<isahc::AsyncBody>, Error> {
326        if self.require_length || self.max_length.is_some() {
327            let response = Response(response);
328            if let Some(len) = response.content_len() {
329                if let Some(max) = self.max_length
330                    && max < len
331                {
332                    return Err(Error::MaxLength {
333                        content_length: Some(len),
334                        max_length: max,
335                    });
336                }
337            } else if self.require_length {
338                return Err(Error::RequireLength);
339            }
340
341            if let Some(max) = self.max_length {
342                let (parts, body) = response.0.into_parts();
343                let response = isahc::Response::from_parts(
344                    parts,
345                    isahc::AsyncBody::from_reader(super::io::ReadLimited::new(body, max, move || {
346                        std::io::Error::new(std::io::ErrorKind::InvalidData, MaxLengthError(None, max))
347                    })),
348                );
349
350                Ok(response)
351            } else {
352                Ok(response.0)
353            }
354        } else {
355            Ok(response)
356        }
357    }
358}
359
360#[derive(Debug)]
364pub struct RequestBuilder {
365    builder: isahc::http::request::Builder,
366    limits: ResponseLimits,
367}
368impl Default for RequestBuilder {
369    fn default() -> Self {
370        Request::builder()
371    }
372}
373impl RequestBuilder {
374    pub fn new() -> Self {
376        Request::builder()
377    }
378
379    fn start(builder: isahc::http::request::Builder) -> Self {
380        Self {
381            builder,
382            limits: ResponseLimits::default(),
383        }
384    }
385
386    pub fn method(self, method: impl TryMethod) -> Result<Self, Error> {
388        Ok(Self {
389            builder: self.builder.method(method.try_method()?),
390            limits: self.limits,
391        })
392    }
393
394    pub fn uri(self, uri: impl TryUri) -> Result<Self, Error> {
396        Ok(Self {
397            builder: self.builder.uri(uri.try_uri()?),
398            limits: self.limits,
399        })
400    }
401
402    pub fn header(self, name: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
404        Ok(Self {
405            builder: self.builder.header(name.try_header_name()?, value.try_header_value()?),
406            limits: self.limits,
407        })
408    }
409
410    pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
414        Self {
415            builder: self.builder.cookie_jar(cookie_jar),
416            limits: self.limits,
417        }
418    }
419
420    pub fn timeout(self, timeout: Duration) -> Self {
431        Self {
432            builder: self.builder.timeout(timeout),
433            limits: self.limits,
434        }
435    }
436
437    pub fn connect_timeout(self, timeout: Duration) -> Self {
441        Self {
442            builder: self.builder.connect_timeout(timeout),
443            limits: self.limits,
444        }
445    }
446
447    pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
451        Self {
452            builder: self.builder.low_speed_timeout(low_speed, timeout),
453            limits: self.limits,
454        }
455    }
456
457    pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
463        if !matches!(policy, RedirectPolicy::None) {
464            Self {
465                builder: self.builder.redirect_policy(policy).auto_referer(),
466                limits: self.limits,
467            }
468        } else {
469            Self {
470                builder: self.builder.redirect_policy(policy),
471                limits: self.limits,
472            }
473        }
474    }
475
476    pub fn auto_decompress(self, enabled: bool) -> Self {
484        Self {
485            builder: self.builder.automatic_decompression(enabled),
486            limits: self.limits,
487        }
488    }
489
490    pub fn max_upload_speed(self, max: u64) -> Self {
492        Self {
493            builder: self.builder.max_upload_speed(max),
494            limits: self.limits,
495        }
496    }
497
498    pub fn max_download_speed(self, max: u64) -> Self {
500        Self {
501            builder: self.builder.max_download_speed(max),
502            limits: self.limits,
503        }
504    }
505
506    pub fn max_length(mut self, max: ByteLength) -> Self {
516        self.limits.max_length = Some(max);
517        self
518    }
519
520    pub fn require_length(mut self, require: bool) -> Self {
522        self.limits.require_length = require;
523        self
524    }
525
526    pub fn metrics(self, enable: bool) -> Self {
532        Self {
533            builder: self.builder.metrics(enable),
534            limits: self.limits,
535        }
536    }
537
538    pub fn build(self) -> Request {
540        self.body(()).unwrap()
541    }
542
543    pub fn body(self, body: impl TryBody) -> Result<Request, Error> {
545        Ok(Request {
546            req: self.builder.body(body.try_body()?).unwrap(),
547            limits: self.limits,
548        })
549    }
550
551    pub fn build_custom<F>(self, custom: F) -> Result<Request, Error>
555    where
556        F: FnOnce(isahc::http::request::Builder) -> isahc::http::Result<isahc::Request<isahc::AsyncBody>>,
557    {
558        let req = custom(self.builder)?;
559        Ok(Request {
560            req: req.map(Body),
561            limits: self.limits,
562        })
563    }
564}
565
566pub type ResponseParts = isahc::http::response::Parts;
568
569#[derive(Debug)]
571pub struct Response(isahc::Response<isahc::AsyncBody>);
572impl Response {
573    pub fn status(&self) -> StatusCode {
575        self.0.status()
576    }
577
578    pub fn headers(&self) -> &header::HeaderMap<header::HeaderValue> {
580        self.0.headers()
581    }
582
583    pub fn content_len(&self) -> Option<ByteLength> {
585        self.0.body().len().map(|l| ByteLength(l as usize))
586    }
587
588    pub fn cookie_jar(&self) -> Option<&CookieJar> {
592        self.0.cookie_jar()
593    }
594
595    pub async fn text(&mut self) -> std::io::Result<Txt> {
597        self.0.text().await.map(Txt::from)
598    }
599
600    pub fn effective_uri(&self) -> Option<&Uri> {
604        self.0.effective_uri()
605    }
606
607    pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
609        Body::bytes_impl(self.0.body_mut()).await
610    }
611
612    pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
614        BufReader::new(self.0.body_mut()).read(buf).await
615    }
616
617    pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
619        BufReader::new(self.0.body_mut()).read_exact(buf).await
620    }
621
622    pub async fn json<O>(&mut self) -> Result<O, serde_json::Error>
624    where
625        O: serde::de::DeserializeOwned + std::marker::Unpin,
626    {
627        self.0.json().await
628    }
629
630    pub fn metrics(&self) -> Metrics {
635        self.0.metrics().map(Metrics::from_isahc).unwrap_or_else(Metrics::zero)
636    }
637
638    pub async fn consume(&mut self) -> std::io::Result<()> {
647        self.0.consume().await
648    }
649
650    pub fn new_message(status: impl Into<StatusCode>, msg: impl Into<String>) -> Self {
652        let status = status.into();
653        let msg = msg.into().into_bytes();
654        let msg = futures_lite::io::Cursor::new(msg);
655        let mut r = isahc::Response::new(isahc::AsyncBody::from_reader(msg));
656        *r.status_mut() = status;
657        Self(r)
658    }
659
660    pub fn new(status: StatusCode, headers: header::HeaderMap<header::HeaderValue>, body: Body) -> Self {
662        let mut r = isahc::Response::new(body.0);
663        *r.status_mut() = status;
664        *r.headers_mut() = headers;
665        Self(r)
666    }
667
668    pub fn into_parts(self) -> (ResponseParts, Body) {
670        let (p, b) = self.0.into_parts();
671        (p, Body(b))
672    }
673
674    pub fn from_parts(parts: ResponseParts, body: Body) -> Self {
676        Self(isahc::Response::from_parts(parts, body.0))
677    }
678}
679impl From<Response> for isahc::Response<isahc::AsyncBody> {
680    fn from(r: Response) -> Self {
681        r.0
682    }
683}
684
685#[derive(Debug, Default)]
689pub struct Body(isahc::AsyncBody);
690impl Body {
691    pub fn empty() -> Body {
695        Body(isahc::AsyncBody::empty())
696    }
697
698    pub fn from_bytes_static(bytes: impl AsRef<[u8]> + 'static) -> Self {
705        Body(isahc::AsyncBody::from_bytes_static(bytes))
706    }
707
708    pub fn from_reader(read: impl AsyncRead + Send + Sync + 'static) -> Self {
710        Body(isahc::AsyncBody::from_reader(read))
711    }
712
713    pub fn from_reader_sized(read: impl AsyncRead + Send + Sync + 'static, size: u64) -> Self {
715        Body(isahc::AsyncBody::from_reader_sized(read, size))
716    }
717
718    pub fn is_empty(&self) -> bool {
724        self.0.is_empty()
725    }
726
727    pub fn len(&self) -> Option<u64> {
729        self.0.len()
730    }
731
732    pub fn reset(&mut self) -> bool {
736        self.0.reset()
737    }
738
739    pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
741        Self::bytes_impl(&mut self.0).await
742    }
743    async fn bytes_impl(body: &mut isahc::AsyncBody) -> std::io::Result<Vec<u8>> {
744        let cap = body.len().unwrap_or(1024);
745        let mut bytes = Vec::with_capacity(cap as usize);
746        super::io::copy(body, &mut bytes).await?;
747        Ok(bytes)
748    }
749
750    pub async fn text_utf8(&mut self) -> Result<Txt, Box<dyn std::error::Error>> {
754        let bytes = self.bytes().await?;
755        let r = String::from_utf8(bytes)?;
756        Ok(Txt::from(r))
757    }
758
759    pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
761        BufReader::new(&mut self.0).read(buf).await
762    }
763
764    pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
766        BufReader::new(&mut self.0).read_exact(buf).await
767    }
768}
769impl From<Body> for isahc::AsyncBody {
770    fn from(r: Body) -> Self {
771        r.0
772    }
773}
774impl From<isahc::AsyncBody> for Body {
775    fn from(r: isahc::AsyncBody) -> Self {
776        Body(r)
777    }
778}
779impl From<()> for Body {
780    fn from(body: ()) -> Self {
781        Body(body.into())
782    }
783}
784impl From<String> for Body {
785    fn from(body: String) -> Self {
786        Body(body.into())
787    }
788}
789impl From<Txt> for Body {
790    fn from(body: Txt) -> Self {
791        Body(String::from(body).into())
792    }
793}
794impl From<Vec<u8>> for Body {
795    fn from(body: Vec<u8>) -> Self {
796        Body(body.into())
797    }
798}
799impl From<&'_ [u8]> for Body {
800    fn from(body: &[u8]) -> Self {
801        body.to_vec().into()
802    }
803}
804impl From<&'_ str> for Body {
805    fn from(body: &str) -> Self {
806        body.as_bytes().into()
807    }
808}
809impl<T: Into<Self>> From<Option<T>> for Body {
810    fn from(body: Option<T>) -> Self {
811        match body {
812            Some(body) => body.into(),
813            None => Self::empty(),
814        }
815    }
816}
817impl AsyncRead for Body {
818    fn poll_read(
819        self: std::pin::Pin<&mut Self>,
820        cx: &mut std::task::Context<'_>,
821        buf: &mut [u8],
822    ) -> std::task::Poll<std::io::Result<usize>> {
823        Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
824    }
825}
826
827pub async fn get(uri: impl TryUri) -> Result<Response, Error> {
831    default_client().get(uri).await
832}
833
834pub async fn get_txt(uri: impl TryUri) -> Result<Txt, Error> {
838    default_client().get_txt(uri).await
839}
840
841pub async fn get_bytes(uri: impl TryUri) -> Result<Vec<u8>, Error> {
845    default_client().get_bytes(uri).await
846}
847
848pub async fn get_json<O>(uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
852where
853    O: serde::de::DeserializeOwned + std::marker::Unpin,
854{
855    default_client().get_json(uri).await
856}
857
858pub async fn head(uri: impl TryUri) -> Result<Response, Error> {
862    default_client().head(uri).await
863}
864
865pub async fn put(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
869    default_client().put(uri, body).await
870}
871
872pub async fn post(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
876    default_client().post(uri, body).await
877}
878
879pub async fn delete(uri: impl TryUri) -> Result<Response, Error> {
883    default_client().delete(uri).await
884}
885
886pub async fn send(request: Request) -> Result<Response, Error> {
890    default_client().send(request).await
891}
892
893pub fn default_client() -> &'static Client {
903    use once_cell::sync::Lazy;
904
905    static SHARED: Lazy<Client> = Lazy::new(|| {
906        let ci = mem::replace(&mut *CLIENT_INIT.lock(), ClientInit::Inited);
907        if let ClientInit::Set(init) = ci {
908            init()
909        } else {
910            Client::new()
912        }
913    });
914    &SHARED
915}
916
917static CLIENT_INIT: Mutex<ClientInit> = const_mutex(ClientInit::None);
918
919enum ClientInit {
920    None,
921    Set(Box<dyn FnOnce() -> Client + Send>),
922    Inited,
923}
924
925pub fn set_default_client_init<I>(init: I) -> Result<(), DefaultAlreadyInitedError>
934where
935    I: FnOnce() -> Client + Send + 'static,
936{
937    let mut ci = CLIENT_INIT.lock();
938    if let ClientInit::Inited = &*ci {
939        Err(DefaultAlreadyInitedError {})
940    } else {
941        *ci = ClientInit::Set(Box::new(init));
942        Ok(())
943    }
944}
945
946#[derive(Debug, Clone, Copy)]
948#[non_exhaustive]
949pub struct DefaultAlreadyInitedError {}
950impl fmt::Display for DefaultAlreadyInitedError {
951    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
952        write!(f, "default client already initialized, can only set before first use")
953    }
954}
955impl std::error::Error for DefaultAlreadyInitedError {}
956
957#[derive(Debug, Clone, PartialEq, Eq)]
959#[non_exhaustive]
960pub struct Metrics {
961    pub upload_progress: (ByteLength, ByteLength),
963
964    pub upload_speed: ByteLength,
966
967    pub download_progress: (ByteLength, ByteLength),
969
970    pub download_speed: ByteLength,
972
973    pub name_lookup_time: Duration,
977
978    pub connect_time: Duration,
982
983    pub secure_connect_time: Duration,
987
988    pub transfer_start_time: Duration,
992
993    pub transfer_time: Duration,
998
999    pub total_time: Duration,
1004
1005    pub redirect_time: Duration,
1008}
1009impl Metrics {
1010    pub fn from_isahc(m: &isahc::Metrics) -> Self {
1012        Self {
1013            upload_progress: {
1014                let (c, t) = m.upload_progress();
1015                ((c as usize).bytes(), (t as usize).bytes())
1016            },
1017            upload_speed: (m.upload_speed().round() as usize).bytes(),
1018            download_progress: {
1019                let (c, t) = m.download_progress();
1020                ((c as usize).bytes(), (t as usize).bytes())
1021            },
1022            download_speed: (m.download_speed().round() as usize).bytes(),
1023            name_lookup_time: m.name_lookup_time(),
1024            connect_time: m.connect_time(),
1025            secure_connect_time: m.secure_connect_time(),
1026            transfer_start_time: m.transfer_start_time(),
1027            transfer_time: m.transfer_time(),
1028            total_time: m.total_time(),
1029            redirect_time: m.redirect_time(),
1030        }
1031    }
1032
1033    pub fn zero() -> Self {
1035        Self {
1036            upload_progress: (0.bytes(), 0.bytes()),
1037            upload_speed: 0.bytes(),
1038            download_progress: (0.bytes(), 0.bytes()),
1039            download_speed: 0.bytes(),
1040            name_lookup_time: Duration::ZERO,
1041            connect_time: Duration::ZERO,
1042            secure_connect_time: Duration::ZERO,
1043            transfer_start_time: Duration::ZERO,
1044            transfer_time: Duration::ZERO,
1045            total_time: Duration::ZERO,
1046            redirect_time: Duration::ZERO,
1047        }
1048    }
1049}
1050impl From<isahc::Metrics> for Metrics {
1051    fn from(m: isahc::Metrics) -> Self {
1052        Metrics::from_isahc(&m)
1053    }
1054}
1055impl fmt::Display for Metrics {
1056    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1057        let mut ws = false; if self.upload_progress.0 != self.upload_progress.1 {
1060            write!(
1061                f,
1062                "↑ {} - {}, {}/s",
1063                self.upload_progress.0, self.upload_progress.1, self.upload_speed
1064            )?;
1065            ws = true;
1066        }
1067        if self.download_progress.0 != self.download_progress.1 {
1068            write!(
1069                f,
1070                "{}↓ {} - {}, {}/s",
1071                if ws { "\n" } else { "" },
1072                self.download_progress.0,
1073                self.download_progress.1,
1074                self.download_speed
1075            )?;
1076            ws = true;
1077        }
1078
1079        if !ws {
1080            if self.upload_progress.1.bytes() > 0 {
1081                write!(f, "↑ {}", self.upload_progress.1)?;
1082                ws = true;
1083            }
1084            if self.download_progress.1.bytes() > 0 {
1085                write!(f, "{}↓ {}", if ws { "\n" } else { "" }, self.download_progress.1)?;
1086                ws = true;
1087            }
1088
1089            if ws {
1090                write!(f, "\n{:?}", self.total_time)?;
1091            }
1092        }
1093
1094        Ok(())
1095    }
1096}
1097impl_from_and_into_var! {
1098    fn from(metrics: Metrics) -> Progress {
1099        let mut status = Progress::indeterminate();
1100        if metrics.download_progress.1 > 0.bytes() {
1101            status = Progress::from_n_of(metrics.download_progress.0.0, metrics.download_progress.1.0);
1102        }
1103        if metrics.upload_progress.1 > 0.bytes() {
1104            let u_status = Progress::from_n_of(metrics.upload_progress.0.0, metrics.upload_progress.1.0);
1105            if status.is_indeterminate() {
1106                status = u_status;
1107            } else {
1108                status = status.and_fct(u_status.fct());
1109            }
1110        }
1111        status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
1112            m.set(*METRICS_ID, metrics);
1113        })
1114    }
1115}
1116zng_state_map::static_id! {
1117    pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
1119}
1120
1121pub struct Client {
1125    client: isahc::HttpClient,
1126    cache: Option<Box<dyn CacheDb>>,
1127    cache_mode: Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>,
1128}
1129impl Default for Client {
1130    fn default() -> Self {
1131        Self::new()
1132    }
1133}
1134impl Clone for Client {
1135    fn clone(&self) -> Self {
1136        Client {
1137            client: self.client.clone(),
1138            cache: self.cache.as_ref().map(|b| b.clone_boxed()),
1139            cache_mode: self.cache_mode.clone(),
1140        }
1141    }
1142}
1143impl fmt::Debug for Client {
1144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1145        f.debug_struct("Client").finish_non_exhaustive()
1146    }
1147}
1148impl Client {
1149    pub fn new() -> Self {
1154        Client::builder()
1155            .cookies()
1156            .redirect_policy(RedirectPolicy::Limit(20))
1157            .connect_timeout(90.secs())
1158            .metrics(true)
1159            .build()
1160    }
1161
1162    pub fn builder() -> ClientBuilder {
1164        ClientBuilder {
1165            builder: isahc::HttpClient::builder(),
1166            cache: None,
1167            cache_mode: None,
1168        }
1169    }
1170
1171    pub fn cookie_jar(&self) -> Option<&CookieJar> {
1173        self.client.cookie_jar()
1174    }
1175
1176    pub async fn get(&self, uri: impl TryUri) -> Result<Response, Error> {
1178        self.send(Request::get(uri)?.build()).await
1179    }
1180
1181    pub async fn get_txt(&self, uri: impl TryUri) -> Result<Txt, Error> {
1183        let mut r = self.get(uri).await?;
1184        let r = r.text().await?;
1185        Ok(r)
1186    }
1187
1188    pub async fn get_bytes(&self, uri: impl TryUri) -> Result<Vec<u8>, Error> {
1190        let mut r = self.get(uri).await?;
1191        let r = r.bytes().await?;
1192        Ok(r)
1193    }
1194
1195    pub async fn get_json<O>(&self, uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
1197    where
1198        O: serde::de::DeserializeOwned + std::marker::Unpin,
1199    {
1200        let mut r = self.get(uri).await?;
1201        let r = r.json::<O>().await?;
1202        Ok(r)
1203    }
1204
1205    pub async fn head(&self, uri: impl TryUri) -> Result<Response, Error> {
1207        self.send(Request::head(uri)?.build()).await
1208    }
1209    pub async fn put(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1211        self.send(Request::put(uri)?.body(body)?).await
1212    }
1213
1214    pub async fn post(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1216        self.send(Request::post(uri)?.body(body)?).await
1217    }
1218
1219    pub async fn delete(&self, uri: impl TryUri) -> Result<Response, Error> {
1221        self.send(Request::delete(uri)?.build()).await
1222    }
1223
1224    pub async fn send(&self, request: Request) -> Result<Response, Error> {
1234        if let Some(db) = &self.cache {
1235            match self.cache_mode(&request) {
1236                CacheMode::NoCache => {
1237                    let response = self.client.send_async(request.req).await?;
1238                    let response = request.limits.check(response)?;
1239                    Ok(Response(response))
1240                }
1241                CacheMode::Default => self.send_cache_default(&**db, request, 0).await,
1242                CacheMode::Permanent => self.send_cache_permanent(&**db, request, 0).await,
1243                CacheMode::Error(e) => Err(e),
1244            }
1245        } else {
1246            let response = self.client.send_async(request.req).await?;
1247            let response = request.limits.check(response)?;
1248            Ok(Response(response))
1249        }
1250    }
1251
1252    #[async_recursion::async_recursion]
1253    async fn send_cache_default(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1254        if retry_count == 3 {
1255            tracing::error!("retried cache 3 times, skipping cache");
1256            let response = self.client.send_async(request.req).await?;
1257            let response = request.limits.check(response)?;
1258            return Ok(Response(response));
1259        }
1260
1261        let key = CacheKey::new(&request.req);
1262        if let Some(policy) = db.policy(&key).await {
1263            match policy.before_request(&request.req) {
1264                BeforeRequest::Fresh(parts) => {
1265                    if let Some(body) = db.body(&key).await {
1266                        let response = isahc::Response::from_parts(parts, body.0);
1267                        let response = request.limits.check(response)?;
1268
1269                        Ok(Response(response))
1270                    } else {
1271                        tracing::error!("cache returned policy but not body");
1272                        db.remove(&key).await;
1273                        self.send_cache_default(db, request, retry_count + 1).await
1274                    }
1275                }
1276                BeforeRequest::Stale { request: parts, matches } => {
1277                    if matches {
1278                        let (_, body) = request.req.into_parts();
1279                        let request = Request {
1280                            req: isahc::Request::from_parts(parts, body),
1281                            limits: request.limits,
1282                        };
1283                        let policy_request = request.clone_with(()).unwrap().req;
1284                        let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1285
1286                        let response = self.client.send_async(request.req).await?;
1287                        let response = request.limits.check(response)?;
1288
1289                        match policy.after_response(&policy_request, &response) {
1290                            AfterResponse::NotModified(policy, parts) => {
1291                                if let Some(body) = db.body(&key).await {
1292                                    let response = isahc::Response::from_parts(parts, body.0);
1293
1294                                    db.set_policy(&key, policy).await;
1295
1296                                    Ok(Response(response))
1297                                } else {
1298                                    tracing::error!("cache returned policy but not body");
1299                                    db.remove(&key).await;
1300
1301                                    if no_req_body {
1302                                        self.send_cache_default(
1303                                            db,
1304                                            Request {
1305                                                req: policy_request,
1306                                                limits: request.limits,
1307                                            },
1308                                            retry_count + 1,
1309                                        )
1310                                        .await
1311                                    } else {
1312                                        Err(std::io::Error::new(
1313                                            std::io::ErrorKind::NotFound,
1314                                            "cache returned policy but not body, cannot auto-retry",
1315                                        )
1316                                        .into())
1317                                    }
1318                                }
1319                            }
1320                            AfterResponse::Modified(policy, parts) => {
1321                                if policy.should_store() {
1322                                    let (_, body) = response.into_parts();
1323                                    if let Some(body) = db.set(&key, policy, Body(body)).await {
1324                                        let response = isahc::Response::from_parts(parts, body.0);
1325
1326                                        Ok(Response(response))
1327                                    } else {
1328                                        tracing::error!("cache db failed to store body");
1329                                        db.remove(&key).await;
1330
1331                                        if no_req_body {
1332                                            self.send_cache_default(
1333                                                db,
1334                                                Request {
1335                                                    req: policy_request,
1336                                                    limits: request.limits,
1337                                                },
1338                                                retry_count + 1,
1339                                            )
1340                                            .await
1341                                        } else {
1342                                            Err(std::io::Error::new(
1343                                                std::io::ErrorKind::NotFound,
1344                                                "cache db failed to store body, cannot auto-retry",
1345                                            )
1346                                            .into())
1347                                        }
1348                                    }
1349                                } else {
1350                                    db.remove(&key).await;
1351
1352                                    Ok(Response(response))
1353                                }
1354                            }
1355                        }
1356                    } else {
1357                        tracing::error!("cache policy did not match request, {request:?}");
1358                        db.remove(&key).await;
1359                        let response = self.client.send_async(request.req).await?;
1360                        let response = request.limits.check(response)?;
1361                        Ok(Response(response))
1362                    }
1363                }
1364            }
1365        } else {
1366            let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1367            let policy_request = request.clone_with(()).unwrap().req;
1368
1369            let response = self.client.send_async(request.req).await?;
1370            let response = request.limits.check(response)?;
1371
1372            let policy = CachePolicy::new(&policy_request, &response);
1373
1374            if policy.should_store() {
1375                let (parts, body) = response.into_parts();
1376
1377                if let Some(body) = db.set(&key, policy, Body(body)).await {
1378                    let response = isahc::Response::from_parts(parts, body.0);
1379
1380                    Ok(Response(response))
1381                } else {
1382                    tracing::error!("cache db failed to store body");
1383                    db.remove(&key).await;
1384
1385                    if no_req_body {
1386                        self.send_cache_default(
1387                            db,
1388                            Request {
1389                                req: policy_request,
1390                                limits: request.limits,
1391                            },
1392                            retry_count + 1,
1393                        )
1394                        .await
1395                    } else {
1396                        Err(std::io::Error::new(std::io::ErrorKind::NotFound, "cache db failed to store body, cannot auto-retry").into())
1397                    }
1398                }
1399            } else {
1400                Ok(Response(response))
1401            }
1402        }
1403    }
1404
1405    #[async_recursion::async_recursion]
1406    async fn send_cache_permanent(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1407        if retry_count == 3 {
1408            tracing::error!("retried cache 3 times, skipping cache");
1409            let response = self.client.send_async(request.req).await?;
1410            let response = request.limits.check(response)?;
1411            return Ok(Response(response));
1412        }
1413
1414        let key = CacheKey::new(&request.req);
1415        if let Some(policy) = db.policy(&key).await {
1416            if let Some(body) = db.body(&key).await {
1417                match policy.before_request(&request.req) {
1418                    BeforeRequest::Fresh(p) => {
1419                        let response = isahc::Response::from_parts(p, body.0);
1420                        let response = request.limits.check(response)?;
1421
1422                        if !policy.is_permanent() {
1423                            db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1424                        }
1425
1426                        Ok(Response(response))
1427                    }
1428                    BeforeRequest::Stale { request: parts, .. } => {
1429                        let limits = request.limits.clone();
1432
1433                        let (_, req_body) = request.req.into_parts();
1434                        let request = isahc::Request::from_parts(parts, req_body);
1435
1436                        let response = self.client.send_async(request).await?;
1437                        let response = limits.check(response)?;
1438
1439                        let (parts, _) = response.into_parts();
1440
1441                        let response = isahc::Response::from_parts(parts, body.0);
1442
1443                        db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1444
1445                        Ok(Response(response))
1446                    }
1447                }
1448            } else {
1449                tracing::error!("cache returned policy but not body");
1450                db.remove(&key).await;
1451                self.send_cache_permanent(db, request, retry_count + 1).await
1452            }
1453        } else {
1454            let backup_request = if request.req.body().len().map(|l| l == 0).unwrap_or(false) {
1455                Some(request.clone_with(()).unwrap())
1456            } else {
1457                None
1458            };
1459
1460            let response = self.client.send_async(request.req).await?;
1461            let response = request.limits.check(response)?;
1462            let policy = CachePolicy::new_permanent(&response);
1463
1464            let (parts, body) = response.into_parts();
1465
1466            if let Some(body) = db.set(&key, policy, Body(body)).await {
1467                let response = isahc::Response::from_parts(parts, body.0);
1468                Ok(Response(response))
1469            } else {
1470                tracing::error!("cache db failed to store body");
1471                db.remove(&key).await;
1472
1473                if let Some(request) = backup_request {
1474                    self.send_cache_permanent(db, request, retry_count + 1).await
1475                } else {
1476                    Err(std::io::Error::new(
1477                        std::io::ErrorKind::NotFound,
1478                        "cache db failed to store permanent body, cannot auto-retry",
1479                    )
1480                    .into())
1481                }
1482            }
1483        }
1484    }
1485
1486    pub fn cache(&self) -> Option<&dyn CacheDb> {
1488        self.cache.as_deref()
1489    }
1490
1491    pub fn cache_mode(&self, request: &Request) -> CacheMode {
1493        if self.cache.is_none() || request.method() != Method::GET {
1494            CacheMode::NoCache
1495        } else {
1496            (self.cache_mode)(request)
1497        }
1498    }
1499}
1500impl From<Client> for isahc::HttpClient {
1501    fn from(c: Client) -> Self {
1502        c.client
1503    }
1504}
1505impl From<isahc::HttpClient> for Client {
1506    fn from(client: isahc::HttpClient) -> Self {
1507        Self {
1508            client,
1509            cache: None,
1510            cache_mode: Arc::new(|_| CacheMode::default()),
1511        }
1512    }
1513}
1514
1515pub struct ClientBuilder {
1527    builder: isahc::HttpClientBuilder,
1528    cache: Option<Box<dyn CacheDb>>,
1529    cache_mode: Option<Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>>,
1530}
1531impl Default for ClientBuilder {
1532    fn default() -> Self {
1533        Client::builder()
1534    }
1535}
1536impl ClientBuilder {
1537    pub fn new() -> Self {
1539        Client::builder()
1540    }
1541
1542    pub fn build(self) -> Client {
1544        Client {
1545            client: self.builder.build().unwrap(),
1546            cache: self.cache,
1547            cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1548        }
1549    }
1550
1551    pub fn build_custom<F>(self, custom: F) -> Result<Client, Error>
1555    where
1556        F: FnOnce(isahc::HttpClientBuilder) -> Result<isahc::HttpClient, Error>,
1557    {
1558        custom(self.builder).map(|c| Client {
1559            client: c,
1560            cache: self.cache,
1561            cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1562        })
1563    }
1564
1565    pub fn default_header(self, key: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
1567        Ok(Self {
1568            builder: self.builder.default_header(key.try_header_name()?, value.try_header_value()?),
1569            cache: self.cache,
1570            cache_mode: self.cache_mode,
1571        })
1572    }
1573
1574    pub fn cookies(self) -> Self {
1576        Self {
1577            builder: self.builder.cookies(),
1578            cache: self.cache,
1579            cache_mode: self.cache_mode,
1580        }
1581    }
1582
1583    pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
1587        Self {
1588            builder: self.builder.cookie_jar(cookie_jar),
1589            cache: self.cache,
1590            cache_mode: self.cache_mode,
1591        }
1592    }
1593
1594    pub fn timeout(self, timeout: Duration) -> Self {
1605        Self {
1606            builder: self.builder.timeout(timeout),
1607            cache: self.cache,
1608            cache_mode: self.cache_mode,
1609        }
1610    }
1611
1612    pub fn connect_timeout(self, timeout: Duration) -> Self {
1616        Self {
1617            builder: self.builder.connect_timeout(timeout),
1618            cache: self.cache,
1619            cache_mode: self.cache_mode,
1620        }
1621    }
1622
1623    pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
1627        Self {
1628            builder: self.builder.low_speed_timeout(low_speed, timeout),
1629            cache: self.cache,
1630            cache_mode: self.cache_mode,
1631        }
1632    }
1633
1634    pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
1638        if !matches!(policy, RedirectPolicy::None) {
1639            Self {
1640                builder: self.builder.redirect_policy(policy).auto_referer(),
1641                cache: self.cache,
1642                cache_mode: self.cache_mode,
1643            }
1644        } else {
1645            Self {
1646                builder: self.builder.redirect_policy(policy),
1647                cache: self.cache,
1648                cache_mode: self.cache_mode,
1649            }
1650        }
1651    }
1652
1653    pub fn auto_decompress(self, enabled: bool) -> Self {
1661        Self {
1662            builder: self.builder.automatic_decompression(enabled),
1663            cache: self.cache,
1664            cache_mode: self.cache_mode,
1665        }
1666    }
1667
1668    pub fn max_upload_speed(self, max: u64) -> Self {
1670        Self {
1671            builder: self.builder.max_upload_speed(max),
1672            cache: self.cache,
1673            cache_mode: self.cache_mode,
1674        }
1675    }
1676
1677    pub fn max_download_speed(self, max: u64) -> Self {
1679        Self {
1680            builder: self.builder.max_download_speed(max),
1681            cache: self.cache,
1682            cache_mode: self.cache_mode,
1683        }
1684    }
1685
1686    pub fn metrics(self, enable: bool) -> Self {
1692        Self {
1693            builder: self.builder.metrics(enable),
1694            cache: self.cache,
1695            cache_mode: self.cache_mode,
1696        }
1697    }
1698
1699    pub fn cache(self, cache: impl CacheDb) -> Self {
1703        Self {
1704            builder: self.builder,
1705            cache: Some(Box::new(cache)),
1706            cache_mode: self.cache_mode,
1707        }
1708    }
1709
1710    pub fn cache_mode(self, selector: impl Fn(&Request) -> CacheMode + Send + Sync + 'static) -> Self {
1719        Self {
1720            builder: self.builder,
1721            cache: self.cache,
1722            cache_mode: Some(Arc::new(selector)),
1723        }
1724    }
1725}
1726
1727#[derive(Debug, Clone)]
1729#[non_exhaustive]
1730pub enum Error {
1731    Client(isahc::Error),
1733    MaxLength {
1737        content_length: Option<ByteLength>,
1739        max_length: ByteLength,
1741    },
1742    RequireLength,
1746}
1747impl StdError for Error {
1748    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1749        match self {
1750            Error::Client(e) => Some(e),
1751            _ => None,
1752        }
1753    }
1754}
1755impl From<isahc::Error> for Error {
1756    fn from(e: isahc::Error) -> Self {
1757        if let Some(e) = e
1758            .source()
1759            .and_then(|e| e.downcast_ref::<std::io::Error>())
1760            .and_then(|e| e.get_ref())
1761        {
1762            if let Some(e) = e.downcast_ref::<MaxLengthError>() {
1763                return Error::MaxLength {
1764                    content_length: e.0,
1765                    max_length: e.1,
1766                };
1767            }
1768            if e.downcast_ref::<RequireLengthError>().is_some() {
1769                return Error::RequireLength;
1770            }
1771        }
1772        Error::Client(e)
1773    }
1774}
1775impl From<isahc::http::Error> for Error {
1776    fn from(e: isahc::http::Error) -> Self {
1777        isahc::Error::from(e).into()
1778    }
1779}
1780impl From<std::io::Error> for Error {
1781    fn from(e: std::io::Error) -> Self {
1782        isahc::Error::from(e).into()
1783    }
1784}
1785impl fmt::Display for Error {
1786    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1787        match self {
1788            Error::Client(e) => write!(f, "{e}"),
1789            Error::MaxLength {
1790                content_length,
1791                max_length,
1792            } => write!(f, "{}", MaxLengthError(*content_length, *max_length)),
1793            Error::RequireLength => write!(f, "{}", RequireLengthError {}),
1794        }
1795    }
1796}
1797
1798#[derive(Debug)]
1801struct MaxLengthError(Option<ByteLength>, ByteLength);
1802impl fmt::Display for MaxLengthError {
1803    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1804        if let Some(l) = self.0 {
1805            write!(f, "content-length of {l} exceeds limit of {}", self.1)
1806        } else {
1807            write!(f, "download reached limit of {}", self.1)
1808        }
1809    }
1810}
1811impl StdError for MaxLengthError {}
1812
1813#[derive(Debug)]
1814#[non_exhaustive]
1815struct RequireLengthError {}
1816impl fmt::Display for RequireLengthError {
1817    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1818        write!(f, "content-length is required")
1819    }
1820}
1821impl StdError for RequireLengthError {}