1#![cfg(feature = "http")]
2#![expect(clippy::type_complexity)]
4
5mod cache;
25mod util;
26
27pub use cache::*;
28use zng_var::impl_from_and_into_var;
29
30use std::convert::TryFrom;
31use std::error::Error as StdError;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::time::Duration;
35use std::{fmt, mem};
36
37use crate::Progress;
38
39use super::io::AsyncRead;
40
41use isahc::config::Configurable;
42pub use isahc::config::RedirectPolicy;
43pub use isahc::cookies::{Cookie, CookieJar};
44pub use isahc::http::{Method, StatusCode, Uri, header, uri};
45
46use futures_lite::io::{AsyncReadExt, BufReader};
47use isahc::{AsyncReadResponseExt, ResponseExt};
48use parking_lot::{Mutex, const_mutex};
49
50use zng_txt::{Txt, formatx};
51use zng_unit::*;
52
53#[diagnostic::on_unimplemented(note = "`TryUri` is implemented for all `T` where `Uri: TryFrom<T, Error: Into<isahc::http::Error>>`")]
57pub trait TryUri {
58 fn try_uri(self) -> Result<Uri, Error>;
60}
61impl<U> TryUri for U
62where
63 Uri: TryFrom<U>,
64 <Uri as TryFrom<U>>::Error: Into<isahc::http::Error>,
65{
66 fn try_uri(self) -> Result<Uri, Error> {
67 Uri::try_from(self).map_err(|e| e.into().into())
68 }
69}
70
71#[diagnostic::on_unimplemented(note = "`TryMethod` is implemented for all `T` where `Method: TryFrom<T, Error: Into<isahc::http::Error>>`")]
75pub trait TryMethod {
76 fn try_method(self) -> Result<Method, Error>;
78}
79impl<U> TryMethod for U
80where
81 Method: TryFrom<U>,
82 <isahc::http::Method as TryFrom<U>>::Error: Into<isahc::http::Error>,
83{
84 fn try_method(self) -> Result<Method, Error> {
85 Method::try_from(self).map_err(|e| e.into().into())
86 }
87}
88
89#[diagnostic::on_unimplemented(note = "`TryBody` is implemented for all `T` where `Body: TryFrom<T, Error: Into<isahc::http::Error>>`")]
94pub trait TryBody {
95 fn try_body(self) -> Result<Body, Error>;
97}
98impl<U> TryBody for U
99where
100 isahc::AsyncBody: TryFrom<U>,
101 <isahc::AsyncBody as TryFrom<U>>::Error: Into<isahc::http::Error>,
102{
103 fn try_body(self) -> Result<Body, Error> {
104 match isahc::AsyncBody::try_from(self) {
105 Ok(r) => Ok(Body(r)),
106 Err(e) => Err(e.into().into()),
107 }
108 }
109}
110
111#[diagnostic::on_unimplemented(
116 note = "`TryHeaderName` is implemented for all `T` where `HeaderName: TryFrom<T, Error: Into<isahc::http::Error>>`"
117)]
118pub trait TryHeaderName {
119 fn try_header_name(self) -> Result<header::HeaderName, Error>;
121}
122impl<U> TryHeaderName for U
123where
124 header::HeaderName: TryFrom<U>,
125 <header::HeaderName as TryFrom<U>>::Error: Into<isahc::http::Error>,
126{
127 fn try_header_name(self) -> Result<header::HeaderName, Error> {
128 header::HeaderName::try_from(self).map_err(|e| e.into().into())
129 }
130}
131
132#[diagnostic::on_unimplemented(
137 note = "`TryHeaderValue` is implemented for all `T` where `HeaderValue: TryFrom<T, Error: Into<isahc::http::Error>>`"
138)]
139pub trait TryHeaderValue {
140 fn try_header_value(self) -> Result<header::HeaderValue, Error>;
142}
143impl<U> TryHeaderValue for U
144where
145 header::HeaderValue: TryFrom<U>,
146 <header::HeaderValue as TryFrom<U>>::Error: Into<isahc::http::Error>,
147{
148 fn try_header_value(self) -> Result<header::HeaderValue, Error> {
149 header::HeaderValue::try_from(self).map_err(|e| e.into().into())
150 }
151}
152
153#[derive(Debug)]
157pub struct Request {
158 req: isahc::Request<Body>,
159 limits: ResponseLimits,
160}
161impl Request {
162 pub fn builder() -> RequestBuilder {
183 RequestBuilder::start(isahc::Request::builder())
184 }
185
186 pub fn get(uri: impl TryUri) -> Result<RequestBuilder, Error> {
198 Ok(RequestBuilder::start(isahc::Request::get(uri.try_uri()?)))
199 }
200
201 pub fn put(uri: impl TryUri) -> Result<RequestBuilder, Error> {
215 Ok(RequestBuilder::start(isahc::Request::put(uri.try_uri()?)))
216 }
217
218 pub fn post(uri: impl TryUri) -> Result<RequestBuilder, Error> {
232 Ok(RequestBuilder::start(isahc::Request::post(uri.try_uri()?)))
233 }
234
235 pub fn delete(uri: impl TryUri) -> Result<RequestBuilder, Error> {
249 Ok(RequestBuilder::start(isahc::Request::delete(uri.try_uri()?)))
250 }
251
252 pub fn patch(uri: impl TryUri) -> Result<RequestBuilder, Error> {
266 Ok(RequestBuilder::start(isahc::Request::patch(uri.try_uri()?)))
267 }
268
269 pub fn head(uri: impl TryUri) -> Result<RequestBuilder, Error> {
281 Ok(RequestBuilder::start(isahc::Request::head(uri.try_uri()?)))
282 }
283
284 pub fn uri(&self) -> &Uri {
286 self.req.uri()
287 }
288
289 pub fn method(&self) -> &Method {
291 self.req.method()
292 }
293
294 pub fn headers(&self) -> &header::HeaderMap {
296 self.req.headers()
297 }
298
299 pub fn clone_with(&self, body: impl TryBody) -> Result<Self, Error> {
301 let body = body.try_body()?;
302
303 let mut req = isahc::Request::new(body);
304 *req.method_mut() = self.req.method().clone();
305 *req.uri_mut() = self.req.uri().clone();
306 *req.version_mut() = self.req.version();
307 let headers = req.headers_mut();
308 for (name, value) in self.headers() {
309 headers.insert(name.clone(), value.clone());
310 }
311
312 Ok(Self {
313 req,
314 limits: self.limits.clone(),
315 })
316 }
317}
318
319#[derive(Debug, Default, Clone)]
320struct ResponseLimits {
321 max_length: Option<ByteLength>,
322 require_length: bool,
323}
324impl ResponseLimits {
325 fn check(&self, response: isahc::Response<isahc::AsyncBody>) -> Result<isahc::Response<isahc::AsyncBody>, Error> {
326 if self.require_length || self.max_length.is_some() {
327 let response = Response(response);
328 if let Some(len) = response.content_len() {
329 if let Some(max) = self.max_length
330 && max < len
331 {
332 return Err(Error::MaxLength {
333 content_length: Some(len),
334 max_length: max,
335 });
336 }
337 } else if self.require_length {
338 return Err(Error::RequireLength);
339 }
340
341 if let Some(max) = self.max_length {
342 let (parts, body) = response.0.into_parts();
343 let response = isahc::Response::from_parts(
344 parts,
345 isahc::AsyncBody::from_reader(super::io::ReadLimited::new(body, max, move || {
346 std::io::Error::new(std::io::ErrorKind::InvalidData, MaxLengthError(None, max))
347 })),
348 );
349
350 Ok(response)
351 } else {
352 Ok(response.0)
353 }
354 } else {
355 Ok(response)
356 }
357 }
358}
359
360#[derive(Debug)]
364pub struct RequestBuilder {
365 builder: isahc::http::request::Builder,
366 limits: ResponseLimits,
367}
368impl Default for RequestBuilder {
369 fn default() -> Self {
370 Request::builder()
371 }
372}
373impl RequestBuilder {
374 pub fn new() -> Self {
376 Request::builder()
377 }
378
379 fn start(builder: isahc::http::request::Builder) -> Self {
380 Self {
381 builder,
382 limits: ResponseLimits::default(),
383 }
384 }
385
386 pub fn method(self, method: impl TryMethod) -> Result<Self, Error> {
388 Ok(Self {
389 builder: self.builder.method(method.try_method()?),
390 limits: self.limits,
391 })
392 }
393
394 pub fn uri(self, uri: impl TryUri) -> Result<Self, Error> {
396 Ok(Self {
397 builder: self.builder.uri(uri.try_uri()?),
398 limits: self.limits,
399 })
400 }
401
402 pub fn header(self, name: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
404 Ok(Self {
405 builder: self.builder.header(name.try_header_name()?, value.try_header_value()?),
406 limits: self.limits,
407 })
408 }
409
410 pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
414 Self {
415 builder: self.builder.cookie_jar(cookie_jar),
416 limits: self.limits,
417 }
418 }
419
420 pub fn timeout(self, timeout: Duration) -> Self {
431 Self {
432 builder: self.builder.timeout(timeout),
433 limits: self.limits,
434 }
435 }
436
437 pub fn connect_timeout(self, timeout: Duration) -> Self {
441 Self {
442 builder: self.builder.connect_timeout(timeout),
443 limits: self.limits,
444 }
445 }
446
447 pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
451 Self {
452 builder: self.builder.low_speed_timeout(low_speed, timeout),
453 limits: self.limits,
454 }
455 }
456
457 pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
463 if !matches!(policy, RedirectPolicy::None) {
464 Self {
465 builder: self.builder.redirect_policy(policy).auto_referer(),
466 limits: self.limits,
467 }
468 } else {
469 Self {
470 builder: self.builder.redirect_policy(policy),
471 limits: self.limits,
472 }
473 }
474 }
475
476 pub fn auto_decompress(self, enabled: bool) -> Self {
484 Self {
485 builder: self.builder.automatic_decompression(enabled),
486 limits: self.limits,
487 }
488 }
489
490 pub fn max_upload_speed(self, max: u64) -> Self {
492 Self {
493 builder: self.builder.max_upload_speed(max),
494 limits: self.limits,
495 }
496 }
497
498 pub fn max_download_speed(self, max: u64) -> Self {
500 Self {
501 builder: self.builder.max_download_speed(max),
502 limits: self.limits,
503 }
504 }
505
506 pub fn max_length(mut self, max: ByteLength) -> Self {
516 self.limits.max_length = Some(max);
517 self
518 }
519
520 pub fn require_length(mut self, require: bool) -> Self {
522 self.limits.require_length = require;
523 self
524 }
525
526 pub fn metrics(self, enable: bool) -> Self {
532 Self {
533 builder: self.builder.metrics(enable),
534 limits: self.limits,
535 }
536 }
537
538 pub fn build(self) -> Request {
540 self.body(()).unwrap()
541 }
542
543 pub fn body(self, body: impl TryBody) -> Result<Request, Error> {
545 Ok(Request {
546 req: self.builder.body(body.try_body()?).unwrap(),
547 limits: self.limits,
548 })
549 }
550
551 pub fn build_custom<F>(self, custom: F) -> Result<Request, Error>
555 where
556 F: FnOnce(isahc::http::request::Builder) -> isahc::http::Result<isahc::Request<isahc::AsyncBody>>,
557 {
558 let req = custom(self.builder)?;
559 Ok(Request {
560 req: req.map(Body),
561 limits: self.limits,
562 })
563 }
564}
565
566pub type ResponseParts = isahc::http::response::Parts;
568
569#[derive(Debug)]
571pub struct Response(isahc::Response<isahc::AsyncBody>);
572impl Response {
573 pub fn status(&self) -> StatusCode {
575 self.0.status()
576 }
577
578 pub fn headers(&self) -> &header::HeaderMap<header::HeaderValue> {
580 self.0.headers()
581 }
582
583 pub fn content_len(&self) -> Option<ByteLength> {
585 self.0.body().len().map(|l| ByteLength(l as usize))
586 }
587
588 pub fn cookie_jar(&self) -> Option<&CookieJar> {
592 self.0.cookie_jar()
593 }
594
595 pub async fn text(&mut self) -> std::io::Result<Txt> {
597 self.0.text().await.map(Txt::from)
598 }
599
600 pub fn effective_uri(&self) -> Option<&Uri> {
604 self.0.effective_uri()
605 }
606
607 pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
609 Body::bytes_impl(self.0.body_mut()).await
610 }
611
612 pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
614 BufReader::new(self.0.body_mut()).read(buf).await
615 }
616
617 pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
619 BufReader::new(self.0.body_mut()).read_exact(buf).await
620 }
621
622 pub async fn json<O>(&mut self) -> Result<O, serde_json::Error>
624 where
625 O: serde::de::DeserializeOwned + std::marker::Unpin,
626 {
627 self.0.json().await
628 }
629
630 pub fn metrics(&self) -> Metrics {
635 self.0.metrics().map(Metrics::from_isahc).unwrap_or_else(Metrics::zero)
636 }
637
638 pub async fn consume(&mut self) -> std::io::Result<()> {
647 self.0.consume().await
648 }
649
650 pub fn new_message(status: impl Into<StatusCode>, msg: impl Into<String>) -> Self {
652 let status = status.into();
653 let msg = msg.into().into_bytes();
654 let msg = futures_lite::io::Cursor::new(msg);
655 let mut r = isahc::Response::new(isahc::AsyncBody::from_reader(msg));
656 *r.status_mut() = status;
657 Self(r)
658 }
659
660 pub fn new(status: StatusCode, headers: header::HeaderMap<header::HeaderValue>, body: Body) -> Self {
662 let mut r = isahc::Response::new(body.0);
663 *r.status_mut() = status;
664 *r.headers_mut() = headers;
665 Self(r)
666 }
667
668 pub fn into_parts(self) -> (ResponseParts, Body) {
670 let (p, b) = self.0.into_parts();
671 (p, Body(b))
672 }
673
674 pub fn from_parts(parts: ResponseParts, body: Body) -> Self {
676 Self(isahc::Response::from_parts(parts, body.0))
677 }
678}
679impl From<Response> for isahc::Response<isahc::AsyncBody> {
680 fn from(r: Response) -> Self {
681 r.0
682 }
683}
684
685#[derive(Debug, Default)]
689pub struct Body(isahc::AsyncBody);
690impl Body {
691 pub fn empty() -> Body {
695 Body(isahc::AsyncBody::empty())
696 }
697
698 pub fn from_bytes_static(bytes: impl AsRef<[u8]> + 'static) -> Self {
705 Body(isahc::AsyncBody::from_bytes_static(bytes))
706 }
707
708 pub fn from_reader(read: impl AsyncRead + Send + Sync + 'static) -> Self {
710 Body(isahc::AsyncBody::from_reader(read))
711 }
712
713 pub fn from_reader_sized(read: impl AsyncRead + Send + Sync + 'static, size: u64) -> Self {
715 Body(isahc::AsyncBody::from_reader_sized(read, size))
716 }
717
718 pub fn is_empty(&self) -> bool {
724 self.0.is_empty()
725 }
726
727 pub fn len(&self) -> Option<u64> {
729 self.0.len()
730 }
731
732 pub fn reset(&mut self) -> bool {
736 self.0.reset()
737 }
738
739 pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
741 Self::bytes_impl(&mut self.0).await
742 }
743 async fn bytes_impl(body: &mut isahc::AsyncBody) -> std::io::Result<Vec<u8>> {
744 let cap = body.len().unwrap_or(1024);
745 let mut bytes = Vec::with_capacity(cap as usize);
746 super::io::copy(body, &mut bytes).await?;
747 Ok(bytes)
748 }
749
750 pub async fn text_utf8(&mut self) -> Result<Txt, Box<dyn std::error::Error>> {
754 let bytes = self.bytes().await?;
755 let r = String::from_utf8(bytes)?;
756 Ok(Txt::from(r))
757 }
758
759 pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
761 BufReader::new(&mut self.0).read(buf).await
762 }
763
764 pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
766 BufReader::new(&mut self.0).read_exact(buf).await
767 }
768}
769impl From<Body> for isahc::AsyncBody {
770 fn from(r: Body) -> Self {
771 r.0
772 }
773}
774impl From<isahc::AsyncBody> for Body {
775 fn from(r: isahc::AsyncBody) -> Self {
776 Body(r)
777 }
778}
779impl From<()> for Body {
780 fn from(body: ()) -> Self {
781 Body(body.into())
782 }
783}
784impl From<String> for Body {
785 fn from(body: String) -> Self {
786 Body(body.into())
787 }
788}
789impl From<Txt> for Body {
790 fn from(body: Txt) -> Self {
791 Body(String::from(body).into())
792 }
793}
794impl From<Vec<u8>> for Body {
795 fn from(body: Vec<u8>) -> Self {
796 Body(body.into())
797 }
798}
799impl From<&'_ [u8]> for Body {
800 fn from(body: &[u8]) -> Self {
801 body.to_vec().into()
802 }
803}
804impl From<&'_ str> for Body {
805 fn from(body: &str) -> Self {
806 body.as_bytes().into()
807 }
808}
809impl<T: Into<Self>> From<Option<T>> for Body {
810 fn from(body: Option<T>) -> Self {
811 match body {
812 Some(body) => body.into(),
813 None => Self::empty(),
814 }
815 }
816}
817impl AsyncRead for Body {
818 fn poll_read(
819 self: std::pin::Pin<&mut Self>,
820 cx: &mut std::task::Context<'_>,
821 buf: &mut [u8],
822 ) -> std::task::Poll<std::io::Result<usize>> {
823 Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
824 }
825}
826
827pub async fn get(uri: impl TryUri) -> Result<Response, Error> {
831 default_client().get(uri).await
832}
833
834pub async fn get_txt(uri: impl TryUri) -> Result<Txt, Error> {
838 default_client().get_txt(uri).await
839}
840
841pub async fn get_bytes(uri: impl TryUri) -> Result<Vec<u8>, Error> {
845 default_client().get_bytes(uri).await
846}
847
848pub async fn get_json<O>(uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
852where
853 O: serde::de::DeserializeOwned + std::marker::Unpin,
854{
855 default_client().get_json(uri).await
856}
857
858pub async fn head(uri: impl TryUri) -> Result<Response, Error> {
862 default_client().head(uri).await
863}
864
865pub async fn put(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
869 default_client().put(uri, body).await
870}
871
872pub async fn post(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
876 default_client().post(uri, body).await
877}
878
879pub async fn delete(uri: impl TryUri) -> Result<Response, Error> {
883 default_client().delete(uri).await
884}
885
886pub async fn send(request: Request) -> Result<Response, Error> {
890 default_client().send(request).await
891}
892
893pub fn default_client() -> &'static Client {
903 use once_cell::sync::Lazy;
904
905 static SHARED: Lazy<Client> = Lazy::new(|| {
906 let ci = mem::replace(&mut *CLIENT_INIT.lock(), ClientInit::Inited);
907 if let ClientInit::Set(init) = ci {
908 init()
909 } else {
910 Client::new()
912 }
913 });
914 &SHARED
915}
916
917static CLIENT_INIT: Mutex<ClientInit> = const_mutex(ClientInit::None);
918
919enum ClientInit {
920 None,
921 Set(Box<dyn FnOnce() -> Client + Send>),
922 Inited,
923}
924
925pub fn set_default_client_init<I>(init: I) -> Result<(), DefaultAlreadyInitedError>
934where
935 I: FnOnce() -> Client + Send + 'static,
936{
937 let mut ci = CLIENT_INIT.lock();
938 if let ClientInit::Inited = &*ci {
939 Err(DefaultAlreadyInitedError {})
940 } else {
941 *ci = ClientInit::Set(Box::new(init));
942 Ok(())
943 }
944}
945
946#[derive(Debug, Clone, Copy)]
948#[non_exhaustive]
949pub struct DefaultAlreadyInitedError {}
950impl fmt::Display for DefaultAlreadyInitedError {
951 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
952 write!(f, "default client already initialized, can only set before first use")
953 }
954}
955impl std::error::Error for DefaultAlreadyInitedError {}
956
957#[derive(Debug, Clone, PartialEq, Eq)]
959#[non_exhaustive]
960pub struct Metrics {
961 pub upload_progress: (ByteLength, ByteLength),
963
964 pub upload_speed: ByteLength,
966
967 pub download_progress: (ByteLength, ByteLength),
969
970 pub download_speed: ByteLength,
972
973 pub name_lookup_time: Duration,
977
978 pub connect_time: Duration,
982
983 pub secure_connect_time: Duration,
987
988 pub transfer_start_time: Duration,
992
993 pub transfer_time: Duration,
998
999 pub total_time: Duration,
1004
1005 pub redirect_time: Duration,
1008}
1009impl Metrics {
1010 pub fn from_isahc(m: &isahc::Metrics) -> Self {
1012 Self {
1013 upload_progress: {
1014 let (c, t) = m.upload_progress();
1015 ((c as usize).bytes(), (t as usize).bytes())
1016 },
1017 upload_speed: (m.upload_speed().round() as usize).bytes(),
1018 download_progress: {
1019 let (c, t) = m.download_progress();
1020 ((c as usize).bytes(), (t as usize).bytes())
1021 },
1022 download_speed: (m.download_speed().round() as usize).bytes(),
1023 name_lookup_time: m.name_lookup_time(),
1024 connect_time: m.connect_time(),
1025 secure_connect_time: m.secure_connect_time(),
1026 transfer_start_time: m.transfer_start_time(),
1027 transfer_time: m.transfer_time(),
1028 total_time: m.total_time(),
1029 redirect_time: m.redirect_time(),
1030 }
1031 }
1032
1033 pub fn zero() -> Self {
1035 Self {
1036 upload_progress: (0.bytes(), 0.bytes()),
1037 upload_speed: 0.bytes(),
1038 download_progress: (0.bytes(), 0.bytes()),
1039 download_speed: 0.bytes(),
1040 name_lookup_time: Duration::ZERO,
1041 connect_time: Duration::ZERO,
1042 secure_connect_time: Duration::ZERO,
1043 transfer_start_time: Duration::ZERO,
1044 transfer_time: Duration::ZERO,
1045 total_time: Duration::ZERO,
1046 redirect_time: Duration::ZERO,
1047 }
1048 }
1049}
1050impl From<isahc::Metrics> for Metrics {
1051 fn from(m: isahc::Metrics) -> Self {
1052 Metrics::from_isahc(&m)
1053 }
1054}
1055impl fmt::Display for Metrics {
1056 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1057 let mut ws = false; if self.upload_progress.0 != self.upload_progress.1 {
1060 write!(
1061 f,
1062 "↑ {} - {}, {}/s",
1063 self.upload_progress.0, self.upload_progress.1, self.upload_speed
1064 )?;
1065 ws = true;
1066 }
1067 if self.download_progress.0 != self.download_progress.1 {
1068 write!(
1069 f,
1070 "{}↓ {} - {}, {}/s",
1071 if ws { "\n" } else { "" },
1072 self.download_progress.0,
1073 self.download_progress.1,
1074 self.download_speed
1075 )?;
1076 ws = true;
1077 }
1078
1079 if !ws {
1080 if self.upload_progress.1.bytes() > 0 {
1081 write!(f, "↑ {}", self.upload_progress.1)?;
1082 ws = true;
1083 }
1084 if self.download_progress.1.bytes() > 0 {
1085 write!(f, "{}↓ {}", if ws { "\n" } else { "" }, self.download_progress.1)?;
1086 ws = true;
1087 }
1088
1089 if ws {
1090 write!(f, "\n{:?}", self.total_time)?;
1091 }
1092 }
1093
1094 Ok(())
1095 }
1096}
1097impl_from_and_into_var! {
1098 fn from(metrics: Metrics) -> Progress {
1099 let mut status = Progress::indeterminate();
1100 if metrics.download_progress.1 > 0.bytes() {
1101 status = Progress::from_n_of(metrics.download_progress.0.0, metrics.download_progress.1.0);
1102 }
1103 if metrics.upload_progress.1 > 0.bytes() {
1104 let u_status = Progress::from_n_of(metrics.upload_progress.0.0, metrics.upload_progress.1.0);
1105 if status.is_indeterminate() {
1106 status = u_status;
1107 } else {
1108 status = status.and_fct(u_status.fct());
1109 }
1110 }
1111 status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
1112 m.set(*METRICS_ID, metrics);
1113 })
1114 }
1115}
1116zng_state_map::static_id! {
1117 pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
1119}
1120
1121pub struct Client {
1125 client: isahc::HttpClient,
1126 cache: Option<Box<dyn CacheDb>>,
1127 cache_mode: Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>,
1128}
1129impl Default for Client {
1130 fn default() -> Self {
1131 Self::new()
1132 }
1133}
1134impl Clone for Client {
1135 fn clone(&self) -> Self {
1136 Client {
1137 client: self.client.clone(),
1138 cache: self.cache.as_ref().map(|b| b.clone_boxed()),
1139 cache_mode: self.cache_mode.clone(),
1140 }
1141 }
1142}
1143impl fmt::Debug for Client {
1144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1145 f.debug_struct("Client").finish_non_exhaustive()
1146 }
1147}
1148impl Client {
1149 pub fn new() -> Self {
1154 Client::builder()
1155 .cookies()
1156 .redirect_policy(RedirectPolicy::Limit(20))
1157 .connect_timeout(90.secs())
1158 .metrics(true)
1159 .build()
1160 }
1161
1162 pub fn builder() -> ClientBuilder {
1164 ClientBuilder {
1165 builder: isahc::HttpClient::builder(),
1166 cache: None,
1167 cache_mode: None,
1168 }
1169 }
1170
1171 pub fn cookie_jar(&self) -> Option<&CookieJar> {
1173 self.client.cookie_jar()
1174 }
1175
1176 pub async fn get(&self, uri: impl TryUri) -> Result<Response, Error> {
1178 self.send(Request::get(uri)?.build()).await
1179 }
1180
1181 pub async fn get_txt(&self, uri: impl TryUri) -> Result<Txt, Error> {
1183 let mut r = self.get(uri).await?;
1184 let r = r.text().await?;
1185 Ok(r)
1186 }
1187
1188 pub async fn get_bytes(&self, uri: impl TryUri) -> Result<Vec<u8>, Error> {
1190 let mut r = self.get(uri).await?;
1191 let r = r.bytes().await?;
1192 Ok(r)
1193 }
1194
1195 pub async fn get_json<O>(&self, uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
1197 where
1198 O: serde::de::DeserializeOwned + std::marker::Unpin,
1199 {
1200 let mut r = self.get(uri).await?;
1201 let r = r.json::<O>().await?;
1202 Ok(r)
1203 }
1204
1205 pub async fn head(&self, uri: impl TryUri) -> Result<Response, Error> {
1207 self.send(Request::head(uri)?.build()).await
1208 }
1209 pub async fn put(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1211 self.send(Request::put(uri)?.body(body)?).await
1212 }
1213
1214 pub async fn post(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1216 self.send(Request::post(uri)?.body(body)?).await
1217 }
1218
1219 pub async fn delete(&self, uri: impl TryUri) -> Result<Response, Error> {
1221 self.send(Request::delete(uri)?.build()).await
1222 }
1223
1224 pub async fn send(&self, request: Request) -> Result<Response, Error> {
1234 if let Some(db) = &self.cache {
1235 match self.cache_mode(&request) {
1236 CacheMode::NoCache => {
1237 let response = self.client.send_async(request.req).await?;
1238 let response = request.limits.check(response)?;
1239 Ok(Response(response))
1240 }
1241 CacheMode::Default => self.send_cache_default(&**db, request, 0).await,
1242 CacheMode::Permanent => self.send_cache_permanent(&**db, request, 0).await,
1243 CacheMode::Error(e) => Err(e),
1244 }
1245 } else {
1246 let response = self.client.send_async(request.req).await?;
1247 let response = request.limits.check(response)?;
1248 Ok(Response(response))
1249 }
1250 }
1251
1252 #[async_recursion::async_recursion]
1253 async fn send_cache_default(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1254 if retry_count == 3 {
1255 tracing::error!("retried cache 3 times, skipping cache");
1256 let response = self.client.send_async(request.req).await?;
1257 let response = request.limits.check(response)?;
1258 return Ok(Response(response));
1259 }
1260
1261 let key = CacheKey::new(&request.req);
1262 if let Some(policy) = db.policy(&key).await {
1263 match policy.before_request(&request.req) {
1264 BeforeRequest::Fresh(parts) => {
1265 if let Some(body) = db.body(&key).await {
1266 let response = isahc::Response::from_parts(parts, body.0);
1267 let response = request.limits.check(response)?;
1268
1269 Ok(Response(response))
1270 } else {
1271 tracing::error!("cache returned policy but not body");
1272 db.remove(&key).await;
1273 self.send_cache_default(db, request, retry_count + 1).await
1274 }
1275 }
1276 BeforeRequest::Stale { request: parts, matches } => {
1277 if matches {
1278 let (_, body) = request.req.into_parts();
1279 let request = Request {
1280 req: isahc::Request::from_parts(parts, body),
1281 limits: request.limits,
1282 };
1283 let policy_request = request.clone_with(()).unwrap().req;
1284 let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1285
1286 let response = self.client.send_async(request.req).await?;
1287 let response = request.limits.check(response)?;
1288
1289 match policy.after_response(&policy_request, &response) {
1290 AfterResponse::NotModified(policy, parts) => {
1291 if let Some(body) = db.body(&key).await {
1292 let response = isahc::Response::from_parts(parts, body.0);
1293
1294 db.set_policy(&key, policy).await;
1295
1296 Ok(Response(response))
1297 } else {
1298 tracing::error!("cache returned policy but not body");
1299 db.remove(&key).await;
1300
1301 if no_req_body {
1302 self.send_cache_default(
1303 db,
1304 Request {
1305 req: policy_request,
1306 limits: request.limits,
1307 },
1308 retry_count + 1,
1309 )
1310 .await
1311 } else {
1312 Err(std::io::Error::new(
1313 std::io::ErrorKind::NotFound,
1314 "cache returned policy but not body, cannot auto-retry",
1315 )
1316 .into())
1317 }
1318 }
1319 }
1320 AfterResponse::Modified(policy, parts) => {
1321 if policy.should_store() {
1322 let (_, body) = response.into_parts();
1323 if let Some(body) = db.set(&key, policy, Body(body)).await {
1324 let response = isahc::Response::from_parts(parts, body.0);
1325
1326 Ok(Response(response))
1327 } else {
1328 tracing::error!("cache db failed to store body");
1329 db.remove(&key).await;
1330
1331 if no_req_body {
1332 self.send_cache_default(
1333 db,
1334 Request {
1335 req: policy_request,
1336 limits: request.limits,
1337 },
1338 retry_count + 1,
1339 )
1340 .await
1341 } else {
1342 Err(std::io::Error::new(
1343 std::io::ErrorKind::NotFound,
1344 "cache db failed to store body, cannot auto-retry",
1345 )
1346 .into())
1347 }
1348 }
1349 } else {
1350 db.remove(&key).await;
1351
1352 Ok(Response(response))
1353 }
1354 }
1355 }
1356 } else {
1357 tracing::error!("cache policy did not match request, {request:?}");
1358 db.remove(&key).await;
1359 let response = self.client.send_async(request.req).await?;
1360 let response = request.limits.check(response)?;
1361 Ok(Response(response))
1362 }
1363 }
1364 }
1365 } else {
1366 let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1367 let policy_request = request.clone_with(()).unwrap().req;
1368
1369 let response = self.client.send_async(request.req).await?;
1370 let response = request.limits.check(response)?;
1371
1372 let policy = CachePolicy::new(&policy_request, &response);
1373
1374 if policy.should_store() {
1375 let (parts, body) = response.into_parts();
1376
1377 if let Some(body) = db.set(&key, policy, Body(body)).await {
1378 let response = isahc::Response::from_parts(parts, body.0);
1379
1380 Ok(Response(response))
1381 } else {
1382 tracing::error!("cache db failed to store body");
1383 db.remove(&key).await;
1384
1385 if no_req_body {
1386 self.send_cache_default(
1387 db,
1388 Request {
1389 req: policy_request,
1390 limits: request.limits,
1391 },
1392 retry_count + 1,
1393 )
1394 .await
1395 } else {
1396 Err(std::io::Error::new(std::io::ErrorKind::NotFound, "cache db failed to store body, cannot auto-retry").into())
1397 }
1398 }
1399 } else {
1400 Ok(Response(response))
1401 }
1402 }
1403 }
1404
1405 #[async_recursion::async_recursion]
1406 async fn send_cache_permanent(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1407 if retry_count == 3 {
1408 tracing::error!("retried cache 3 times, skipping cache");
1409 let response = self.client.send_async(request.req).await?;
1410 let response = request.limits.check(response)?;
1411 return Ok(Response(response));
1412 }
1413
1414 let key = CacheKey::new(&request.req);
1415 if let Some(policy) = db.policy(&key).await {
1416 if let Some(body) = db.body(&key).await {
1417 match policy.before_request(&request.req) {
1418 BeforeRequest::Fresh(p) => {
1419 let response = isahc::Response::from_parts(p, body.0);
1420 let response = request.limits.check(response)?;
1421
1422 if !policy.is_permanent() {
1423 db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1424 }
1425
1426 Ok(Response(response))
1427 }
1428 BeforeRequest::Stale { request: parts, .. } => {
1429 let limits = request.limits.clone();
1432
1433 let (_, req_body) = request.req.into_parts();
1434 let request = isahc::Request::from_parts(parts, req_body);
1435
1436 let response = self.client.send_async(request).await?;
1437 let response = limits.check(response)?;
1438
1439 let (parts, _) = response.into_parts();
1440
1441 let response = isahc::Response::from_parts(parts, body.0);
1442
1443 db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1444
1445 Ok(Response(response))
1446 }
1447 }
1448 } else {
1449 tracing::error!("cache returned policy but not body");
1450 db.remove(&key).await;
1451 self.send_cache_permanent(db, request, retry_count + 1).await
1452 }
1453 } else {
1454 let backup_request = if request.req.body().len().map(|l| l == 0).unwrap_or(false) {
1455 Some(request.clone_with(()).unwrap())
1456 } else {
1457 None
1458 };
1459
1460 let response = self.client.send_async(request.req).await?;
1461 let response = request.limits.check(response)?;
1462 let policy = CachePolicy::new_permanent(&response);
1463
1464 let (parts, body) = response.into_parts();
1465
1466 if let Some(body) = db.set(&key, policy, Body(body)).await {
1467 let response = isahc::Response::from_parts(parts, body.0);
1468 Ok(Response(response))
1469 } else {
1470 tracing::error!("cache db failed to store body");
1471 db.remove(&key).await;
1472
1473 if let Some(request) = backup_request {
1474 self.send_cache_permanent(db, request, retry_count + 1).await
1475 } else {
1476 Err(std::io::Error::new(
1477 std::io::ErrorKind::NotFound,
1478 "cache db failed to store permanent body, cannot auto-retry",
1479 )
1480 .into())
1481 }
1482 }
1483 }
1484 }
1485
1486 pub fn cache(&self) -> Option<&dyn CacheDb> {
1488 self.cache.as_deref()
1489 }
1490
1491 pub fn cache_mode(&self, request: &Request) -> CacheMode {
1493 if self.cache.is_none() || request.method() != Method::GET {
1494 CacheMode::NoCache
1495 } else {
1496 (self.cache_mode)(request)
1497 }
1498 }
1499}
1500impl From<Client> for isahc::HttpClient {
1501 fn from(c: Client) -> Self {
1502 c.client
1503 }
1504}
1505impl From<isahc::HttpClient> for Client {
1506 fn from(client: isahc::HttpClient) -> Self {
1507 Self {
1508 client,
1509 cache: None,
1510 cache_mode: Arc::new(|_| CacheMode::default()),
1511 }
1512 }
1513}
1514
1515pub struct ClientBuilder {
1527 builder: isahc::HttpClientBuilder,
1528 cache: Option<Box<dyn CacheDb>>,
1529 cache_mode: Option<Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>>,
1530}
1531impl Default for ClientBuilder {
1532 fn default() -> Self {
1533 Client::builder()
1534 }
1535}
1536impl ClientBuilder {
1537 pub fn new() -> Self {
1539 Client::builder()
1540 }
1541
1542 pub fn build(self) -> Client {
1544 Client {
1545 client: self.builder.build().unwrap(),
1546 cache: self.cache,
1547 cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1548 }
1549 }
1550
1551 pub fn build_custom<F>(self, custom: F) -> Result<Client, Error>
1555 where
1556 F: FnOnce(isahc::HttpClientBuilder) -> Result<isahc::HttpClient, Error>,
1557 {
1558 custom(self.builder).map(|c| Client {
1559 client: c,
1560 cache: self.cache,
1561 cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1562 })
1563 }
1564
1565 pub fn default_header(self, key: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
1567 Ok(Self {
1568 builder: self.builder.default_header(key.try_header_name()?, value.try_header_value()?),
1569 cache: self.cache,
1570 cache_mode: self.cache_mode,
1571 })
1572 }
1573
1574 pub fn cookies(self) -> Self {
1576 Self {
1577 builder: self.builder.cookies(),
1578 cache: self.cache,
1579 cache_mode: self.cache_mode,
1580 }
1581 }
1582
1583 pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
1587 Self {
1588 builder: self.builder.cookie_jar(cookie_jar),
1589 cache: self.cache,
1590 cache_mode: self.cache_mode,
1591 }
1592 }
1593
1594 pub fn timeout(self, timeout: Duration) -> Self {
1605 Self {
1606 builder: self.builder.timeout(timeout),
1607 cache: self.cache,
1608 cache_mode: self.cache_mode,
1609 }
1610 }
1611
1612 pub fn connect_timeout(self, timeout: Duration) -> Self {
1616 Self {
1617 builder: self.builder.connect_timeout(timeout),
1618 cache: self.cache,
1619 cache_mode: self.cache_mode,
1620 }
1621 }
1622
1623 pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
1627 Self {
1628 builder: self.builder.low_speed_timeout(low_speed, timeout),
1629 cache: self.cache,
1630 cache_mode: self.cache_mode,
1631 }
1632 }
1633
1634 pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
1638 if !matches!(policy, RedirectPolicy::None) {
1639 Self {
1640 builder: self.builder.redirect_policy(policy).auto_referer(),
1641 cache: self.cache,
1642 cache_mode: self.cache_mode,
1643 }
1644 } else {
1645 Self {
1646 builder: self.builder.redirect_policy(policy),
1647 cache: self.cache,
1648 cache_mode: self.cache_mode,
1649 }
1650 }
1651 }
1652
1653 pub fn auto_decompress(self, enabled: bool) -> Self {
1661 Self {
1662 builder: self.builder.automatic_decompression(enabled),
1663 cache: self.cache,
1664 cache_mode: self.cache_mode,
1665 }
1666 }
1667
1668 pub fn max_upload_speed(self, max: u64) -> Self {
1670 Self {
1671 builder: self.builder.max_upload_speed(max),
1672 cache: self.cache,
1673 cache_mode: self.cache_mode,
1674 }
1675 }
1676
1677 pub fn max_download_speed(self, max: u64) -> Self {
1679 Self {
1680 builder: self.builder.max_download_speed(max),
1681 cache: self.cache,
1682 cache_mode: self.cache_mode,
1683 }
1684 }
1685
1686 pub fn metrics(self, enable: bool) -> Self {
1692 Self {
1693 builder: self.builder.metrics(enable),
1694 cache: self.cache,
1695 cache_mode: self.cache_mode,
1696 }
1697 }
1698
1699 pub fn cache(self, cache: impl CacheDb) -> Self {
1703 Self {
1704 builder: self.builder,
1705 cache: Some(Box::new(cache)),
1706 cache_mode: self.cache_mode,
1707 }
1708 }
1709
1710 pub fn cache_mode(self, selector: impl Fn(&Request) -> CacheMode + Send + Sync + 'static) -> Self {
1719 Self {
1720 builder: self.builder,
1721 cache: self.cache,
1722 cache_mode: Some(Arc::new(selector)),
1723 }
1724 }
1725}
1726
1727#[derive(Debug, Clone)]
1729#[non_exhaustive]
1730pub enum Error {
1731 Client(isahc::Error),
1733 MaxLength {
1737 content_length: Option<ByteLength>,
1739 max_length: ByteLength,
1741 },
1742 RequireLength,
1746}
1747impl StdError for Error {
1748 fn source(&self) -> Option<&(dyn StdError + 'static)> {
1749 match self {
1750 Error::Client(e) => Some(e),
1751 _ => None,
1752 }
1753 }
1754}
1755impl From<isahc::Error> for Error {
1756 fn from(e: isahc::Error) -> Self {
1757 if let Some(e) = e
1758 .source()
1759 .and_then(|e| e.downcast_ref::<std::io::Error>())
1760 .and_then(|e| e.get_ref())
1761 {
1762 if let Some(e) = e.downcast_ref::<MaxLengthError>() {
1763 return Error::MaxLength {
1764 content_length: e.0,
1765 max_length: e.1,
1766 };
1767 }
1768 if e.downcast_ref::<RequireLengthError>().is_some() {
1769 return Error::RequireLength;
1770 }
1771 }
1772 Error::Client(e)
1773 }
1774}
1775impl From<isahc::http::Error> for Error {
1776 fn from(e: isahc::http::Error) -> Self {
1777 isahc::Error::from(e).into()
1778 }
1779}
1780impl From<std::io::Error> for Error {
1781 fn from(e: std::io::Error) -> Self {
1782 isahc::Error::from(e).into()
1783 }
1784}
1785impl fmt::Display for Error {
1786 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1787 match self {
1788 Error::Client(e) => write!(f, "{e}"),
1789 Error::MaxLength {
1790 content_length,
1791 max_length,
1792 } => write!(f, "{}", MaxLengthError(*content_length, *max_length)),
1793 Error::RequireLength => write!(f, "{}", RequireLengthError {}),
1794 }
1795 }
1796}
1797
1798#[derive(Debug)]
1801struct MaxLengthError(Option<ByteLength>, ByteLength);
1802impl fmt::Display for MaxLengthError {
1803 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1804 if let Some(l) = self.0 {
1805 write!(f, "content-length of {l} exceeds limit of {}", self.1)
1806 } else {
1807 write!(f, "download reached limit of {}", self.1)
1808 }
1809 }
1810}
1811impl StdError for MaxLengthError {}
1812
1813#[derive(Debug)]
1814#[non_exhaustive]
1815struct RequireLengthError {}
1816impl fmt::Display for RequireLengthError {
1817 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1818 write!(f, "content-length is required")
1819 }
1820}
1821impl StdError for RequireLengthError {}