1#![cfg(feature = "http")]
2#![expect(clippy::type_complexity)]
4
5mod cache;
25mod util;
26
27pub use cache::*;
28use zng_var::impl_from_and_into_var;
29
30use std::convert::TryFrom;
31use std::error::Error as StdError;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::time::Duration;
35use std::{fmt, mem};
36
37use crate::Progress;
38
39use super::io::AsyncRead;
40
41use isahc::config::Configurable;
42pub use isahc::config::RedirectPolicy;
43pub use isahc::cookies::{Cookie, CookieJar};
44pub use isahc::http::{Method, StatusCode, Uri, header, uri};
45
46use futures_lite::io::{AsyncReadExt, BufReader};
47use isahc::{AsyncReadResponseExt, ResponseExt};
48use parking_lot::{Mutex, const_mutex};
49
50use zng_txt::{Txt, formatx};
51use zng_unit::*;
52
53#[diagnostic::on_unimplemented(note = "`TryUri` is implemented for all `T` where `Uri: TryFrom<T, Error: Into<isahc::http::Error>>`")]
57pub trait TryUri {
58 fn try_uri(self) -> Result<Uri, Error>;
60}
61impl<U> TryUri for U
62where
63 Uri: TryFrom<U>,
64 <Uri as TryFrom<U>>::Error: Into<isahc::http::Error>,
65{
66 fn try_uri(self) -> Result<Uri, Error> {
67 Uri::try_from(self).map_err(|e| e.into().into())
68 }
69}
70
71#[diagnostic::on_unimplemented(note = "`TryMethod` is implemented for all `T` where `Method: TryFrom<T, Error: Into<isahc::http::Error>>`")]
75pub trait TryMethod {
76 fn try_method(self) -> Result<Method, Error>;
78}
79impl<U> TryMethod for U
80where
81 Method: TryFrom<U>,
82 <isahc::http::Method as TryFrom<U>>::Error: Into<isahc::http::Error>,
83{
84 fn try_method(self) -> Result<Method, Error> {
85 Method::try_from(self).map_err(|e| e.into().into())
86 }
87}
88
89#[diagnostic::on_unimplemented(note = "`TryBody` is implemented for all `T` where `Body: TryFrom<T, Error: Into<isahc::http::Error>>`")]
94pub trait TryBody {
95 fn try_body(self) -> Result<Body, Error>;
97}
98impl<U> TryBody for U
99where
100 isahc::AsyncBody: TryFrom<U>,
101 <isahc::AsyncBody as TryFrom<U>>::Error: Into<isahc::http::Error>,
102{
103 fn try_body(self) -> Result<Body, Error> {
104 match isahc::AsyncBody::try_from(self) {
105 Ok(r) => Ok(Body(r)),
106 Err(e) => Err(e.into().into()),
107 }
108 }
109}
110
111#[diagnostic::on_unimplemented(
116 note = "`TryHeaderName` is implemented for all `T` where `HeaderName: TryFrom<T, Error: Into<isahc::http::Error>>`"
117)]
118pub trait TryHeaderName {
119 fn try_header_name(self) -> Result<header::HeaderName, Error>;
121}
122impl<U> TryHeaderName for U
123where
124 header::HeaderName: TryFrom<U>,
125 <header::HeaderName as TryFrom<U>>::Error: Into<isahc::http::Error>,
126{
127 fn try_header_name(self) -> Result<header::HeaderName, Error> {
128 header::HeaderName::try_from(self).map_err(|e| e.into().into())
129 }
130}
131
132#[diagnostic::on_unimplemented(
137 note = "`TryHeaderValue` is implemented for all `T` where `HeaderValue: TryFrom<T, Error: Into<isahc::http::Error>>`"
138)]
139pub trait TryHeaderValue {
140 fn try_header_value(self) -> Result<header::HeaderValue, Error>;
142}
143impl<U> TryHeaderValue for U
144where
145 header::HeaderValue: TryFrom<U>,
146 <header::HeaderValue as TryFrom<U>>::Error: Into<isahc::http::Error>,
147{
148 fn try_header_value(self) -> Result<header::HeaderValue, Error> {
149 header::HeaderValue::try_from(self).map_err(|e| e.into().into())
150 }
151}
152
153#[derive(Debug)]
157pub struct Request {
158 req: isahc::Request<Body>,
159 limits: ResponseLimits,
160}
161impl Request {
162 pub fn builder() -> RequestBuilder {
180 RequestBuilder::start(isahc::Request::builder())
181 }
182
183 pub fn get(uri: impl TryUri) -> Result<RequestBuilder, Error> {
195 Ok(RequestBuilder::start(isahc::Request::get(uri.try_uri()?)))
196 }
197
198 pub fn put(uri: impl TryUri) -> Result<RequestBuilder, Error> {
210 Ok(RequestBuilder::start(isahc::Request::put(uri.try_uri()?)))
211 }
212
213 pub fn post(uri: impl TryUri) -> Result<RequestBuilder, Error> {
225 Ok(RequestBuilder::start(isahc::Request::post(uri.try_uri()?)))
226 }
227
228 pub fn delete(uri: impl TryUri) -> Result<RequestBuilder, Error> {
240 Ok(RequestBuilder::start(isahc::Request::delete(uri.try_uri()?)))
241 }
242
243 pub fn patch(uri: impl TryUri) -> Result<RequestBuilder, Error> {
255 Ok(RequestBuilder::start(isahc::Request::patch(uri.try_uri()?)))
256 }
257
258 pub fn head(uri: impl TryUri) -> Result<RequestBuilder, Error> {
270 Ok(RequestBuilder::start(isahc::Request::head(uri.try_uri()?)))
271 }
272
273 pub fn uri(&self) -> &Uri {
275 self.req.uri()
276 }
277
278 pub fn method(&self) -> &Method {
280 self.req.method()
281 }
282
283 pub fn headers(&self) -> &header::HeaderMap {
285 self.req.headers()
286 }
287
288 pub fn clone_with(&self, body: impl TryBody) -> Result<Self, Error> {
290 let body = body.try_body()?;
291
292 let mut req = isahc::Request::new(body);
293 *req.method_mut() = self.req.method().clone();
294 *req.uri_mut() = self.req.uri().clone();
295 *req.version_mut() = self.req.version();
296 let headers = req.headers_mut();
297 for (name, value) in self.headers() {
298 headers.insert(name.clone(), value.clone());
299 }
300
301 Ok(Self {
302 req,
303 limits: self.limits.clone(),
304 })
305 }
306}
307
308#[derive(Debug, Default, Clone)]
309struct ResponseLimits {
310 max_length: Option<ByteLength>,
311 require_length: bool,
312}
313impl ResponseLimits {
314 fn check(&self, response: isahc::Response<isahc::AsyncBody>) -> Result<isahc::Response<isahc::AsyncBody>, Error> {
315 if self.require_length || self.max_length.is_some() {
316 let response = Response(response);
317 if let Some(len) = response.content_len() {
318 if let Some(max) = self.max_length
319 && max < len
320 {
321 return Err(Error::MaxLength {
322 content_length: Some(len),
323 max_length: max,
324 });
325 }
326 } else if self.require_length {
327 return Err(Error::RequireLength);
328 }
329
330 if let Some(max) = self.max_length {
331 let (parts, body) = response.0.into_parts();
332 let response = isahc::Response::from_parts(
333 parts,
334 isahc::AsyncBody::from_reader(super::io::ReadLimited::new(body, max, move || {
335 std::io::Error::new(std::io::ErrorKind::InvalidData, MaxLengthError(None, max))
336 })),
337 );
338
339 Ok(response)
340 } else {
341 Ok(response.0)
342 }
343 } else {
344 Ok(response)
345 }
346 }
347}
348
349#[derive(Debug)]
353pub struct RequestBuilder {
354 builder: isahc::http::request::Builder,
355 limits: ResponseLimits,
356}
357impl Default for RequestBuilder {
358 fn default() -> Self {
359 Request::builder()
360 }
361}
362impl RequestBuilder {
363 pub fn new() -> Self {
365 Request::builder()
366 }
367
368 fn start(builder: isahc::http::request::Builder) -> Self {
369 Self {
370 builder,
371 limits: ResponseLimits::default(),
372 }
373 }
374
375 pub fn method(self, method: impl TryMethod) -> Result<Self, Error> {
377 Ok(Self {
378 builder: self.builder.method(method.try_method()?),
379 limits: self.limits,
380 })
381 }
382
383 pub fn uri(self, uri: impl TryUri) -> Result<Self, Error> {
385 Ok(Self {
386 builder: self.builder.uri(uri.try_uri()?),
387 limits: self.limits,
388 })
389 }
390
391 pub fn header(self, name: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
393 Ok(Self {
394 builder: self.builder.header(name.try_header_name()?, value.try_header_value()?),
395 limits: self.limits,
396 })
397 }
398
399 pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
403 Self {
404 builder: self.builder.cookie_jar(cookie_jar),
405 limits: self.limits,
406 }
407 }
408
409 pub fn timeout(self, timeout: Duration) -> Self {
420 Self {
421 builder: self.builder.timeout(timeout),
422 limits: self.limits,
423 }
424 }
425
426 pub fn connect_timeout(self, timeout: Duration) -> Self {
430 Self {
431 builder: self.builder.connect_timeout(timeout),
432 limits: self.limits,
433 }
434 }
435
436 pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
440 Self {
441 builder: self.builder.low_speed_timeout(low_speed, timeout),
442 limits: self.limits,
443 }
444 }
445
446 pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
452 if !matches!(policy, RedirectPolicy::None) {
453 Self {
454 builder: self.builder.redirect_policy(policy).auto_referer(),
455 limits: self.limits,
456 }
457 } else {
458 Self {
459 builder: self.builder.redirect_policy(policy),
460 limits: self.limits,
461 }
462 }
463 }
464
465 pub fn auto_decompress(self, enabled: bool) -> Self {
473 Self {
474 builder: self.builder.automatic_decompression(enabled),
475 limits: self.limits,
476 }
477 }
478
479 pub fn max_upload_speed(self, max: u64) -> Self {
481 Self {
482 builder: self.builder.max_upload_speed(max),
483 limits: self.limits,
484 }
485 }
486
487 pub fn max_download_speed(self, max: u64) -> Self {
489 Self {
490 builder: self.builder.max_download_speed(max),
491 limits: self.limits,
492 }
493 }
494
495 pub fn max_length(mut self, max: ByteLength) -> Self {
505 self.limits.max_length = Some(max);
506 self
507 }
508
509 pub fn require_length(mut self, require: bool) -> Self {
511 self.limits.require_length = require;
512 self
513 }
514
515 pub fn metrics(self, enable: bool) -> Self {
521 Self {
522 builder: self.builder.metrics(enable),
523 limits: self.limits,
524 }
525 }
526
527 pub fn build(self) -> Request {
529 self.body(()).unwrap()
530 }
531
532 pub fn body(self, body: impl TryBody) -> Result<Request, Error> {
534 Ok(Request {
535 req: self.builder.body(body.try_body()?).unwrap(),
536 limits: self.limits,
537 })
538 }
539
540 pub fn build_custom<F>(self, custom: F) -> Result<Request, Error>
544 where
545 F: FnOnce(isahc::http::request::Builder) -> isahc::http::Result<isahc::Request<isahc::AsyncBody>>,
546 {
547 let req = custom(self.builder)?;
548 Ok(Request {
549 req: req.map(Body),
550 limits: self.limits,
551 })
552 }
553}
554
555pub type ResponseParts = isahc::http::response::Parts;
557
558#[derive(Debug)]
560pub struct Response(isahc::Response<isahc::AsyncBody>);
561impl Response {
562 pub fn status(&self) -> StatusCode {
564 self.0.status()
565 }
566
567 pub fn headers(&self) -> &header::HeaderMap<header::HeaderValue> {
569 self.0.headers()
570 }
571
572 pub fn content_len(&self) -> Option<ByteLength> {
574 self.0.body().len().map(|l| ByteLength(l as usize))
575 }
576
577 pub fn cookie_jar(&self) -> Option<&CookieJar> {
581 self.0.cookie_jar()
582 }
583
584 pub async fn text(&mut self) -> std::io::Result<Txt> {
586 self.0.text().await.map(Txt::from)
587 }
588
589 pub fn effective_uri(&self) -> Option<&Uri> {
593 self.0.effective_uri()
594 }
595
596 pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
598 Body::bytes_impl(self.0.body_mut()).await
599 }
600
601 pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
603 BufReader::new(self.0.body_mut()).read(buf).await
604 }
605
606 pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
608 BufReader::new(self.0.body_mut()).read_exact(buf).await
609 }
610
611 pub async fn json<O>(&mut self) -> Result<O, serde_json::Error>
613 where
614 O: serde::de::DeserializeOwned + std::marker::Unpin,
615 {
616 self.0.json().await
617 }
618
619 pub fn metrics(&self) -> Metrics {
624 self.0.metrics().map(Metrics::from_isahc).unwrap_or_else(Metrics::zero)
625 }
626
627 pub async fn consume(&mut self) -> std::io::Result<()> {
636 self.0.consume().await
637 }
638
639 pub fn new_message(status: impl Into<StatusCode>, msg: impl Into<String>) -> Self {
641 let status = status.into();
642 let msg = msg.into().into_bytes();
643 let msg = futures_lite::io::Cursor::new(msg);
644 let mut r = isahc::Response::new(isahc::AsyncBody::from_reader(msg));
645 *r.status_mut() = status;
646 Self(r)
647 }
648
649 pub fn new(status: StatusCode, headers: header::HeaderMap<header::HeaderValue>, body: Body) -> Self {
651 let mut r = isahc::Response::new(body.0);
652 *r.status_mut() = status;
653 *r.headers_mut() = headers;
654 Self(r)
655 }
656
657 pub fn into_parts(self) -> (ResponseParts, Body) {
659 let (p, b) = self.0.into_parts();
660 (p, Body(b))
661 }
662
663 pub fn from_parts(parts: ResponseParts, body: Body) -> Self {
665 Self(isahc::Response::from_parts(parts, body.0))
666 }
667}
668impl From<Response> for isahc::Response<isahc::AsyncBody> {
669 fn from(r: Response) -> Self {
670 r.0
671 }
672}
673
674#[derive(Debug, Default)]
678pub struct Body(isahc::AsyncBody);
679impl Body {
680 pub fn empty() -> Body {
684 Body(isahc::AsyncBody::empty())
685 }
686
687 pub fn from_bytes_static(bytes: impl AsRef<[u8]> + 'static) -> Self {
694 Body(isahc::AsyncBody::from_bytes_static(bytes))
695 }
696
697 pub fn from_reader(read: impl AsyncRead + Send + Sync + 'static) -> Self {
699 Body(isahc::AsyncBody::from_reader(read))
700 }
701
702 pub fn from_reader_sized(read: impl AsyncRead + Send + Sync + 'static, size: u64) -> Self {
704 Body(isahc::AsyncBody::from_reader_sized(read, size))
705 }
706
707 pub fn is_empty(&self) -> bool {
713 self.0.is_empty()
714 }
715
716 pub fn len(&self) -> Option<u64> {
718 self.0.len()
719 }
720
721 pub fn reset(&mut self) -> bool {
725 self.0.reset()
726 }
727
728 pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
730 Self::bytes_impl(&mut self.0).await
731 }
732 async fn bytes_impl(body: &mut isahc::AsyncBody) -> std::io::Result<Vec<u8>> {
733 let cap = body.len().unwrap_or(1024);
734 let mut bytes = Vec::with_capacity(cap as usize);
735 super::io::copy(body, &mut bytes).await?;
736 Ok(bytes)
737 }
738
739 pub async fn text_utf8(&mut self) -> Result<Txt, Box<dyn std::error::Error>> {
743 let bytes = self.bytes().await?;
744 let r = String::from_utf8(bytes)?;
745 Ok(Txt::from(r))
746 }
747
748 pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
750 BufReader::new(&mut self.0).read(buf).await
751 }
752
753 pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
755 BufReader::new(&mut self.0).read_exact(buf).await
756 }
757}
758impl From<Body> for isahc::AsyncBody {
759 fn from(r: Body) -> Self {
760 r.0
761 }
762}
763impl From<isahc::AsyncBody> for Body {
764 fn from(r: isahc::AsyncBody) -> Self {
765 Body(r)
766 }
767}
768impl From<()> for Body {
769 fn from(body: ()) -> Self {
770 Body(body.into())
771 }
772}
773impl From<String> for Body {
774 fn from(body: String) -> Self {
775 Body(body.into())
776 }
777}
778impl From<Txt> for Body {
779 fn from(body: Txt) -> Self {
780 Body(String::from(body).into())
781 }
782}
783impl From<Vec<u8>> for Body {
784 fn from(body: Vec<u8>) -> Self {
785 Body(body.into())
786 }
787}
788impl From<&'_ [u8]> for Body {
789 fn from(body: &[u8]) -> Self {
790 body.to_vec().into()
791 }
792}
793impl From<&'_ str> for Body {
794 fn from(body: &str) -> Self {
795 body.as_bytes().into()
796 }
797}
798impl<T: Into<Self>> From<Option<T>> for Body {
799 fn from(body: Option<T>) -> Self {
800 match body {
801 Some(body) => body.into(),
802 None => Self::empty(),
803 }
804 }
805}
806impl AsyncRead for Body {
807 fn poll_read(
808 self: std::pin::Pin<&mut Self>,
809 cx: &mut std::task::Context<'_>,
810 buf: &mut [u8],
811 ) -> std::task::Poll<std::io::Result<usize>> {
812 Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
813 }
814}
815
816pub async fn get(uri: impl TryUri) -> Result<Response, Error> {
820 default_client().get(uri).await
821}
822
823pub async fn get_txt(uri: impl TryUri) -> Result<Txt, Error> {
827 default_client().get_txt(uri).await
828}
829
830pub async fn get_bytes(uri: impl TryUri) -> Result<Vec<u8>, Error> {
834 default_client().get_bytes(uri).await
835}
836
837pub async fn get_json<O>(uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
841where
842 O: serde::de::DeserializeOwned + std::marker::Unpin,
843{
844 default_client().get_json(uri).await
845}
846
847pub async fn head(uri: impl TryUri) -> Result<Response, Error> {
851 default_client().head(uri).await
852}
853
854pub async fn put(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
858 default_client().put(uri, body).await
859}
860
861pub async fn post(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
865 default_client().post(uri, body).await
866}
867
868pub async fn delete(uri: impl TryUri) -> Result<Response, Error> {
872 default_client().delete(uri).await
873}
874
875pub async fn send(request: Request) -> Result<Response, Error> {
879 default_client().send(request).await
880}
881
882pub fn default_client() -> &'static Client {
892 use once_cell::sync::Lazy;
893
894 static SHARED: Lazy<Client> = Lazy::new(|| {
895 let ci = mem::replace(&mut *CLIENT_INIT.lock(), ClientInit::Inited);
896 if let ClientInit::Set(init) = ci {
897 init()
898 } else {
899 Client::new()
901 }
902 });
903 &SHARED
904}
905
906static CLIENT_INIT: Mutex<ClientInit> = const_mutex(ClientInit::None);
907
908enum ClientInit {
909 None,
910 Set(Box<dyn FnOnce() -> Client + Send>),
911 Inited,
912}
913
914pub fn set_default_client_init<I>(init: I) -> Result<(), DefaultAlreadyInitedError>
923where
924 I: FnOnce() -> Client + Send + 'static,
925{
926 let mut ci = CLIENT_INIT.lock();
927 if let ClientInit::Inited = &*ci {
928 Err(DefaultAlreadyInitedError {})
929 } else {
930 *ci = ClientInit::Set(Box::new(init));
931 Ok(())
932 }
933}
934
935#[derive(Debug, Clone, Copy)]
937#[non_exhaustive]
938pub struct DefaultAlreadyInitedError {}
939impl fmt::Display for DefaultAlreadyInitedError {
940 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
941 write!(f, "default client already initialized, can only set before first use")
942 }
943}
944impl std::error::Error for DefaultAlreadyInitedError {}
945
946#[derive(Debug, Clone, PartialEq, Eq)]
948#[non_exhaustive]
949pub struct Metrics {
950 pub upload_progress: (ByteLength, ByteLength),
952
953 pub upload_speed: ByteLength,
955
956 pub download_progress: (ByteLength, ByteLength),
958
959 pub download_speed: ByteLength,
961
962 pub name_lookup_time: Duration,
966
967 pub connect_time: Duration,
971
972 pub secure_connect_time: Duration,
976
977 pub transfer_start_time: Duration,
981
982 pub transfer_time: Duration,
987
988 pub total_time: Duration,
993
994 pub redirect_time: Duration,
997}
998impl Metrics {
999 pub fn from_isahc(m: &isahc::Metrics) -> Self {
1001 Self {
1002 upload_progress: {
1003 let (c, t) = m.upload_progress();
1004 ((c as usize).bytes(), (t as usize).bytes())
1005 },
1006 upload_speed: (m.upload_speed().round() as usize).bytes(),
1007 download_progress: {
1008 let (c, t) = m.download_progress();
1009 ((c as usize).bytes(), (t as usize).bytes())
1010 },
1011 download_speed: (m.download_speed().round() as usize).bytes(),
1012 name_lookup_time: m.name_lookup_time(),
1013 connect_time: m.connect_time(),
1014 secure_connect_time: m.secure_connect_time(),
1015 transfer_start_time: m.transfer_start_time(),
1016 transfer_time: m.transfer_time(),
1017 total_time: m.total_time(),
1018 redirect_time: m.redirect_time(),
1019 }
1020 }
1021
1022 pub fn zero() -> Self {
1024 Self {
1025 upload_progress: (0.bytes(), 0.bytes()),
1026 upload_speed: 0.bytes(),
1027 download_progress: (0.bytes(), 0.bytes()),
1028 download_speed: 0.bytes(),
1029 name_lookup_time: Duration::ZERO,
1030 connect_time: Duration::ZERO,
1031 secure_connect_time: Duration::ZERO,
1032 transfer_start_time: Duration::ZERO,
1033 transfer_time: Duration::ZERO,
1034 total_time: Duration::ZERO,
1035 redirect_time: Duration::ZERO,
1036 }
1037 }
1038}
1039impl From<isahc::Metrics> for Metrics {
1040 fn from(m: isahc::Metrics) -> Self {
1041 Metrics::from_isahc(&m)
1042 }
1043}
1044impl fmt::Display for Metrics {
1045 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1046 let mut ws = false; if self.upload_progress.0 != self.upload_progress.1 {
1049 write!(
1050 f,
1051 "↑ {} - {}, {}/s",
1052 self.upload_progress.0, self.upload_progress.1, self.upload_speed
1053 )?;
1054 ws = true;
1055 }
1056 if self.download_progress.0 != self.download_progress.1 {
1057 write!(
1058 f,
1059 "{}↓ {} - {}, {}/s",
1060 if ws { "\n" } else { "" },
1061 self.download_progress.0,
1062 self.download_progress.1,
1063 self.download_speed
1064 )?;
1065 ws = true;
1066 }
1067
1068 if !ws {
1069 if self.upload_progress.1.bytes() > 0 {
1070 write!(f, "↑ {}", self.upload_progress.1)?;
1071 ws = true;
1072 }
1073 if self.download_progress.1.bytes() > 0 {
1074 write!(f, "{}↓ {}", if ws { "\n" } else { "" }, self.download_progress.1)?;
1075 ws = true;
1076 }
1077
1078 if ws {
1079 write!(f, "\n{:?}", self.total_time)?;
1080 }
1081 }
1082
1083 Ok(())
1084 }
1085}
1086impl_from_and_into_var! {
1087 fn from(metrics: Metrics) -> Progress {
1088 let mut status = Progress::indeterminate();
1089 if metrics.download_progress.1 > 0.bytes() {
1090 status = Progress::from_n_of(metrics.download_progress.0 .0, metrics.download_progress.1 .0);
1091 }
1092 if metrics.upload_progress.1 > 0.bytes() {
1093 let u_status = Progress::from_n_of(metrics.upload_progress.0 .0, metrics.upload_progress.1 .0);
1094 if status.is_indeterminate() {
1095 status = u_status;
1096 } else {
1097 status = status.and_fct(u_status.fct());
1098 }
1099 }
1100 status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
1101 m.set(*METRICS_ID, metrics);
1102 })
1103 }
1104}
1105zng_state_map::static_id! {
1106 pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
1108}
1109
1110pub struct Client {
1114 client: isahc::HttpClient,
1115 cache: Option<Box<dyn CacheDb>>,
1116 cache_mode: Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>,
1117}
1118impl Default for Client {
1119 fn default() -> Self {
1120 Self::new()
1121 }
1122}
1123impl Clone for Client {
1124 fn clone(&self) -> Self {
1125 Client {
1126 client: self.client.clone(),
1127 cache: self.cache.as_ref().map(|b| b.clone_boxed()),
1128 cache_mode: self.cache_mode.clone(),
1129 }
1130 }
1131}
1132impl fmt::Debug for Client {
1133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1134 f.debug_struct("Client").finish_non_exhaustive()
1135 }
1136}
1137impl Client {
1138 pub fn new() -> Self {
1143 Client::builder()
1144 .cookies()
1145 .redirect_policy(RedirectPolicy::Limit(20))
1146 .connect_timeout(90.secs())
1147 .metrics(true)
1148 .build()
1149 }
1150
1151 pub fn builder() -> ClientBuilder {
1153 ClientBuilder {
1154 builder: isahc::HttpClient::builder(),
1155 cache: None,
1156 cache_mode: None,
1157 }
1158 }
1159
1160 pub fn cookie_jar(&self) -> Option<&CookieJar> {
1162 self.client.cookie_jar()
1163 }
1164
1165 pub async fn get(&self, uri: impl TryUri) -> Result<Response, Error> {
1167 self.send(Request::get(uri)?.build()).await
1168 }
1169
1170 pub async fn get_txt(&self, uri: impl TryUri) -> Result<Txt, Error> {
1172 let mut r = self.get(uri).await?;
1173 let r = r.text().await?;
1174 Ok(r)
1175 }
1176
1177 pub async fn get_bytes(&self, uri: impl TryUri) -> Result<Vec<u8>, Error> {
1179 let mut r = self.get(uri).await?;
1180 let r = r.bytes().await?;
1181 Ok(r)
1182 }
1183
1184 pub async fn get_json<O>(&self, uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
1186 where
1187 O: serde::de::DeserializeOwned + std::marker::Unpin,
1188 {
1189 let mut r = self.get(uri).await?;
1190 let r = r.json::<O>().await?;
1191 Ok(r)
1192 }
1193
1194 pub async fn head(&self, uri: impl TryUri) -> Result<Response, Error> {
1196 self.send(Request::head(uri)?.build()).await
1197 }
1198 pub async fn put(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1200 self.send(Request::put(uri)?.body(body)?).await
1201 }
1202
1203 pub async fn post(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1205 self.send(Request::post(uri)?.body(body)?).await
1206 }
1207
1208 pub async fn delete(&self, uri: impl TryUri) -> Result<Response, Error> {
1210 self.send(Request::delete(uri)?.build()).await
1211 }
1212
1213 pub async fn send(&self, request: Request) -> Result<Response, Error> {
1223 if let Some(db) = &self.cache {
1224 match self.cache_mode(&request) {
1225 CacheMode::NoCache => {
1226 let response = self.client.send_async(request.req).await?;
1227 let response = request.limits.check(response)?;
1228 Ok(Response(response))
1229 }
1230 CacheMode::Default => self.send_cache_default(&**db, request, 0).await,
1231 CacheMode::Permanent => self.send_cache_permanent(&**db, request, 0).await,
1232 CacheMode::Error(e) => Err(e),
1233 }
1234 } else {
1235 let response = self.client.send_async(request.req).await?;
1236 let response = request.limits.check(response)?;
1237 Ok(Response(response))
1238 }
1239 }
1240
1241 #[async_recursion::async_recursion]
1242 async fn send_cache_default(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1243 if retry_count == 3 {
1244 tracing::error!("retried cache 3 times, skipping cache");
1245 let response = self.client.send_async(request.req).await?;
1246 let response = request.limits.check(response)?;
1247 return Ok(Response(response));
1248 }
1249
1250 let key = CacheKey::new(&request.req);
1251 if let Some(policy) = db.policy(&key).await {
1252 match policy.before_request(&request.req) {
1253 BeforeRequest::Fresh(parts) => {
1254 if let Some(body) = db.body(&key).await {
1255 let response = isahc::Response::from_parts(parts, body.0);
1256 let response = request.limits.check(response)?;
1257
1258 Ok(Response(response))
1259 } else {
1260 tracing::error!("cache returned policy but not body");
1261 db.remove(&key).await;
1262 self.send_cache_default(db, request, retry_count + 1).await
1263 }
1264 }
1265 BeforeRequest::Stale { request: parts, matches } => {
1266 if matches {
1267 let (_, body) = request.req.into_parts();
1268 let request = Request {
1269 req: isahc::Request::from_parts(parts, body),
1270 limits: request.limits,
1271 };
1272 let policy_request = request.clone_with(()).unwrap().req;
1273 let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1274
1275 let response = self.client.send_async(request.req).await?;
1276 let response = request.limits.check(response)?;
1277
1278 match policy.after_response(&policy_request, &response) {
1279 AfterResponse::NotModified(policy, parts) => {
1280 if let Some(body) = db.body(&key).await {
1281 let response = isahc::Response::from_parts(parts, body.0);
1282
1283 db.set_policy(&key, policy).await;
1284
1285 Ok(Response(response))
1286 } else {
1287 tracing::error!("cache returned policy but not body");
1288 db.remove(&key).await;
1289
1290 if no_req_body {
1291 self.send_cache_default(
1292 db,
1293 Request {
1294 req: policy_request,
1295 limits: request.limits,
1296 },
1297 retry_count + 1,
1298 )
1299 .await
1300 } else {
1301 Err(std::io::Error::new(
1302 std::io::ErrorKind::NotFound,
1303 "cache returned policy but not body, cannot auto-retry",
1304 )
1305 .into())
1306 }
1307 }
1308 }
1309 AfterResponse::Modified(policy, parts) => {
1310 if policy.should_store() {
1311 let (_, body) = response.into_parts();
1312 if let Some(body) = db.set(&key, policy, Body(body)).await {
1313 let response = isahc::Response::from_parts(parts, body.0);
1314
1315 Ok(Response(response))
1316 } else {
1317 tracing::error!("cache db failed to store body");
1318 db.remove(&key).await;
1319
1320 if no_req_body {
1321 self.send_cache_default(
1322 db,
1323 Request {
1324 req: policy_request,
1325 limits: request.limits,
1326 },
1327 retry_count + 1,
1328 )
1329 .await
1330 } else {
1331 Err(std::io::Error::new(
1332 std::io::ErrorKind::NotFound,
1333 "cache db failed to store body, cannot auto-retry",
1334 )
1335 .into())
1336 }
1337 }
1338 } else {
1339 db.remove(&key).await;
1340
1341 Ok(Response(response))
1342 }
1343 }
1344 }
1345 } else {
1346 tracing::error!("cache policy did not match request, {request:?}");
1347 db.remove(&key).await;
1348 let response = self.client.send_async(request.req).await?;
1349 let response = request.limits.check(response)?;
1350 Ok(Response(response))
1351 }
1352 }
1353 }
1354 } else {
1355 let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1356 let policy_request = request.clone_with(()).unwrap().req;
1357
1358 let response = self.client.send_async(request.req).await?;
1359 let response = request.limits.check(response)?;
1360
1361 let policy = CachePolicy::new(&policy_request, &response);
1362
1363 if policy.should_store() {
1364 let (parts, body) = response.into_parts();
1365
1366 if let Some(body) = db.set(&key, policy, Body(body)).await {
1367 let response = isahc::Response::from_parts(parts, body.0);
1368
1369 Ok(Response(response))
1370 } else {
1371 tracing::error!("cache db failed to store body");
1372 db.remove(&key).await;
1373
1374 if no_req_body {
1375 self.send_cache_default(
1376 db,
1377 Request {
1378 req: policy_request,
1379 limits: request.limits,
1380 },
1381 retry_count + 1,
1382 )
1383 .await
1384 } else {
1385 Err(std::io::Error::new(std::io::ErrorKind::NotFound, "cache db failed to store body, cannot auto-retry").into())
1386 }
1387 }
1388 } else {
1389 Ok(Response(response))
1390 }
1391 }
1392 }
1393
1394 #[async_recursion::async_recursion]
1395 async fn send_cache_permanent(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1396 if retry_count == 3 {
1397 tracing::error!("retried cache 3 times, skipping cache");
1398 let response = self.client.send_async(request.req).await?;
1399 let response = request.limits.check(response)?;
1400 return Ok(Response(response));
1401 }
1402
1403 let key = CacheKey::new(&request.req);
1404 if let Some(policy) = db.policy(&key).await {
1405 if let Some(body) = db.body(&key).await {
1406 match policy.before_request(&request.req) {
1407 BeforeRequest::Fresh(p) => {
1408 let response = isahc::Response::from_parts(p, body.0);
1409 let response = request.limits.check(response)?;
1410
1411 if !policy.is_permanent() {
1412 db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1413 }
1414
1415 Ok(Response(response))
1416 }
1417 BeforeRequest::Stale { request: parts, .. } => {
1418 let limits = request.limits.clone();
1421
1422 let (_, req_body) = request.req.into_parts();
1423 let request = isahc::Request::from_parts(parts, req_body);
1424
1425 let response = self.client.send_async(request).await?;
1426 let response = limits.check(response)?;
1427
1428 let (parts, _) = response.into_parts();
1429
1430 let response = isahc::Response::from_parts(parts, body.0);
1431
1432 db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1433
1434 Ok(Response(response))
1435 }
1436 }
1437 } else {
1438 tracing::error!("cache returned policy but not body");
1439 db.remove(&key).await;
1440 self.send_cache_permanent(db, request, retry_count + 1).await
1441 }
1442 } else {
1443 let backup_request = if request.req.body().len().map(|l| l == 0).unwrap_or(false) {
1444 Some(request.clone_with(()).unwrap())
1445 } else {
1446 None
1447 };
1448
1449 let response = self.client.send_async(request.req).await?;
1450 let response = request.limits.check(response)?;
1451 let policy = CachePolicy::new_permanent(&response);
1452
1453 let (parts, body) = response.into_parts();
1454
1455 if let Some(body) = db.set(&key, policy, Body(body)).await {
1456 let response = isahc::Response::from_parts(parts, body.0);
1457 Ok(Response(response))
1458 } else {
1459 tracing::error!("cache db failed to store body");
1460 db.remove(&key).await;
1461
1462 if let Some(request) = backup_request {
1463 self.send_cache_permanent(db, request, retry_count + 1).await
1464 } else {
1465 Err(std::io::Error::new(
1466 std::io::ErrorKind::NotFound,
1467 "cache db failed to store permanent body, cannot auto-retry",
1468 )
1469 .into())
1470 }
1471 }
1472 }
1473 }
1474
1475 pub fn cache(&self) -> Option<&dyn CacheDb> {
1477 self.cache.as_deref()
1478 }
1479
1480 pub fn cache_mode(&self, request: &Request) -> CacheMode {
1482 if self.cache.is_none() || request.method() != Method::GET {
1483 CacheMode::NoCache
1484 } else {
1485 (self.cache_mode)(request)
1486 }
1487 }
1488}
1489impl From<Client> for isahc::HttpClient {
1490 fn from(c: Client) -> Self {
1491 c.client
1492 }
1493}
1494impl From<isahc::HttpClient> for Client {
1495 fn from(client: isahc::HttpClient) -> Self {
1496 Self {
1497 client,
1498 cache: None,
1499 cache_mode: Arc::new(|_| CacheMode::default()),
1500 }
1501 }
1502}
1503
1504pub struct ClientBuilder {
1516 builder: isahc::HttpClientBuilder,
1517 cache: Option<Box<dyn CacheDb>>,
1518 cache_mode: Option<Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>>,
1519}
1520impl Default for ClientBuilder {
1521 fn default() -> Self {
1522 Client::builder()
1523 }
1524}
1525impl ClientBuilder {
1526 pub fn new() -> Self {
1528 Client::builder()
1529 }
1530
1531 pub fn build(self) -> Client {
1533 Client {
1534 client: self.builder.build().unwrap(),
1535 cache: self.cache,
1536 cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1537 }
1538 }
1539
1540 pub fn build_custom<F>(self, custom: F) -> Result<Client, Error>
1544 where
1545 F: FnOnce(isahc::HttpClientBuilder) -> Result<isahc::HttpClient, Error>,
1546 {
1547 custom(self.builder).map(|c| Client {
1548 client: c,
1549 cache: self.cache,
1550 cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1551 })
1552 }
1553
1554 pub fn default_header(self, key: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
1556 Ok(Self {
1557 builder: self.builder.default_header(key.try_header_name()?, value.try_header_value()?),
1558 cache: self.cache,
1559 cache_mode: self.cache_mode,
1560 })
1561 }
1562
1563 pub fn cookies(self) -> Self {
1565 Self {
1566 builder: self.builder.cookies(),
1567 cache: self.cache,
1568 cache_mode: self.cache_mode,
1569 }
1570 }
1571
1572 pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
1576 Self {
1577 builder: self.builder.cookie_jar(cookie_jar),
1578 cache: self.cache,
1579 cache_mode: self.cache_mode,
1580 }
1581 }
1582
1583 pub fn timeout(self, timeout: Duration) -> Self {
1594 Self {
1595 builder: self.builder.timeout(timeout),
1596 cache: self.cache,
1597 cache_mode: self.cache_mode,
1598 }
1599 }
1600
1601 pub fn connect_timeout(self, timeout: Duration) -> Self {
1605 Self {
1606 builder: self.builder.connect_timeout(timeout),
1607 cache: self.cache,
1608 cache_mode: self.cache_mode,
1609 }
1610 }
1611
1612 pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
1616 Self {
1617 builder: self.builder.low_speed_timeout(low_speed, timeout),
1618 cache: self.cache,
1619 cache_mode: self.cache_mode,
1620 }
1621 }
1622
1623 pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
1627 if !matches!(policy, RedirectPolicy::None) {
1628 Self {
1629 builder: self.builder.redirect_policy(policy).auto_referer(),
1630 cache: self.cache,
1631 cache_mode: self.cache_mode,
1632 }
1633 } else {
1634 Self {
1635 builder: self.builder.redirect_policy(policy),
1636 cache: self.cache,
1637 cache_mode: self.cache_mode,
1638 }
1639 }
1640 }
1641
1642 pub fn auto_decompress(self, enabled: bool) -> Self {
1650 Self {
1651 builder: self.builder.automatic_decompression(enabled),
1652 cache: self.cache,
1653 cache_mode: self.cache_mode,
1654 }
1655 }
1656
1657 pub fn max_upload_speed(self, max: u64) -> Self {
1659 Self {
1660 builder: self.builder.max_upload_speed(max),
1661 cache: self.cache,
1662 cache_mode: self.cache_mode,
1663 }
1664 }
1665
1666 pub fn max_download_speed(self, max: u64) -> Self {
1668 Self {
1669 builder: self.builder.max_download_speed(max),
1670 cache: self.cache,
1671 cache_mode: self.cache_mode,
1672 }
1673 }
1674
1675 pub fn metrics(self, enable: bool) -> Self {
1681 Self {
1682 builder: self.builder.metrics(enable),
1683 cache: self.cache,
1684 cache_mode: self.cache_mode,
1685 }
1686 }
1687
1688 pub fn cache(self, cache: impl CacheDb) -> Self {
1692 Self {
1693 builder: self.builder,
1694 cache: Some(Box::new(cache)),
1695 cache_mode: self.cache_mode,
1696 }
1697 }
1698
1699 pub fn cache_mode(self, selector: impl Fn(&Request) -> CacheMode + Send + Sync + 'static) -> Self {
1708 Self {
1709 builder: self.builder,
1710 cache: self.cache,
1711 cache_mode: Some(Arc::new(selector)),
1712 }
1713 }
1714}
1715
1716#[derive(Debug, Clone)]
1718#[non_exhaustive]
1719pub enum Error {
1720 Client(isahc::Error),
1722 MaxLength {
1726 content_length: Option<ByteLength>,
1728 max_length: ByteLength,
1730 },
1731 RequireLength,
1735}
1736impl StdError for Error {
1737 fn source(&self) -> Option<&(dyn StdError + 'static)> {
1738 match self {
1739 Error::Client(e) => Some(e),
1740 _ => None,
1741 }
1742 }
1743}
1744impl From<isahc::Error> for Error {
1745 fn from(e: isahc::Error) -> Self {
1746 if let Some(e) = e
1747 .source()
1748 .and_then(|e| e.downcast_ref::<std::io::Error>())
1749 .and_then(|e| e.get_ref())
1750 {
1751 if let Some(e) = e.downcast_ref::<MaxLengthError>() {
1752 return Error::MaxLength {
1753 content_length: e.0,
1754 max_length: e.1,
1755 };
1756 }
1757 if e.downcast_ref::<RequireLengthError>().is_some() {
1758 return Error::RequireLength;
1759 }
1760 }
1761 Error::Client(e)
1762 }
1763}
1764impl From<isahc::http::Error> for Error {
1765 fn from(e: isahc::http::Error) -> Self {
1766 isahc::Error::from(e).into()
1767 }
1768}
1769impl From<std::io::Error> for Error {
1770 fn from(e: std::io::Error) -> Self {
1771 isahc::Error::from(e).into()
1772 }
1773}
1774impl fmt::Display for Error {
1775 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1776 match self {
1777 Error::Client(e) => write!(f, "{e}"),
1778 Error::MaxLength {
1779 content_length,
1780 max_length,
1781 } => write!(f, "{}", MaxLengthError(*content_length, *max_length)),
1782 Error::RequireLength => write!(f, "{}", RequireLengthError {}),
1783 }
1784 }
1785}
1786
1787#[derive(Debug)]
1790struct MaxLengthError(Option<ByteLength>, ByteLength);
1791impl fmt::Display for MaxLengthError {
1792 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1793 if let Some(l) = self.0 {
1794 write!(f, "content-length of {l} exceeds limit of {}", self.1)
1795 } else {
1796 write!(f, "download reached limit of {}", self.1)
1797 }
1798 }
1799}
1800impl StdError for MaxLengthError {}
1801
1802#[derive(Debug)]
1803#[non_exhaustive]
1804struct RequireLengthError {}
1805impl fmt::Display for RequireLengthError {
1806 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1807 write!(f, "content-length is required")
1808 }
1809}
1810impl StdError for RequireLengthError {}