1use std::{
2 collections::BTreeMap,
3 io::Write,
4 path::Path,
5 sync::{Arc, OnceLock, RwLock},
6 time::Duration,
7};
8
9use bytes::Bytes;
10use reqwest::{Method, StatusCode, header};
11use serde::{Serialize, de::DeserializeOwned};
12use serde_json::Value;
13use tokio::time;
14
15use super::{AuthInjector, Error};
16use crate::{CliCoreError, Result};
17
18const MAX_RETRIES: usize = 3;
19const BASE_BACKOFF: Duration = Duration::from_millis(500);
20const BUILTIN_DEFAULT_USER_AGENT: &str = "cli/dev";
21static DEFAULT_USER_AGENT: OnceLock<RwLock<String>> = OnceLock::new();
22
23pub fn set_default_user_agent(user_agent: impl Into<String>) {
31 let lock =
32 DEFAULT_USER_AGENT.get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()));
33 if let Ok(mut current) = lock.write() {
34 *current = user_agent.into();
35 }
36}
37
38pub(crate) fn default_user_agent() -> String {
44 DEFAULT_USER_AGENT
45 .get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()))
46 .read()
47 .map_or_else(
48 |_| BUILTIN_DEFAULT_USER_AGENT.to_owned(),
49 |value| value.clone(),
50 )
51}
52
53#[cfg(test)]
57pub(crate) static UA_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
58
59#[cfg(test)]
64pub(crate) struct RestoreDefaultUserAgent;
65
66#[cfg(test)]
67impl Drop for RestoreDefaultUserAgent {
68 fn drop(&mut self) {
69 set_default_user_agent(BUILTIN_DEFAULT_USER_AGENT);
70 }
71}
72
73static DEFAULT_TRANSPORT_LOGGER: OnceLock<RwLock<Arc<dyn TransportLogger>>> = OnceLock::new();
74
75fn default_transport_logger_lock() -> &'static RwLock<Arc<dyn TransportLogger>> {
76 DEFAULT_TRANSPORT_LOGGER.get_or_init(|| RwLock::new(Arc::new(NoopTransportLogger)))
77}
78
79pub fn set_default_transport_logger(logger: Arc<dyn TransportLogger>) {
91 let mut current = default_transport_logger_lock()
95 .write()
96 .unwrap_or_else(std::sync::PoisonError::into_inner);
97 *current = logger;
98}
99
100#[must_use]
104pub fn default_transport_logger() -> Arc<dyn TransportLogger> {
105 default_transport_logger_lock()
106 .read()
107 .unwrap_or_else(std::sync::PoisonError::into_inner)
108 .clone()
109}
110
111pub fn debug_log_reqwest_request(request: &reqwest::Request) {
120 let logger = default_transport_logger();
121 if !logger.enabled() {
122 return;
123 }
124 logger.debug(&TransportLogEvent {
125 message: "http request",
126 fields: BTreeMap::from([
127 ("method".to_owned(), request.method().as_str().to_owned()),
128 ("url".to_owned(), request.url().as_str().to_owned()),
129 ]),
130 headers: Some(header_pairs(request.headers())),
131 body: request
132 .body()
133 .and_then(reqwest::Body::as_bytes)
134 .map(<[u8]>::to_vec),
135 });
136}
137
138pub fn debug_log_reqwest_response(status: StatusCode, headers: &header::HeaderMap, body: &[u8]) {
146 let logger = default_transport_logger();
147 if !logger.enabled() {
148 return;
149 }
150 logger.debug(&TransportLogEvent {
151 message: "http response",
152 fields: BTreeMap::from([("status".to_owned(), status.as_u16().to_string())]),
153 headers: Some(header_pairs(headers)),
154 body: Some(body.to_vec()),
155 });
156}
157
158#[cfg(test)]
162pub(crate) static TRANSPORT_LOGGER_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
163
164#[cfg(test)]
169pub(crate) struct RestoreDefaultTransportLogger;
170
171#[cfg(test)]
172impl Drop for RestoreDefaultTransportLogger {
173 fn drop(&mut self) {
174 set_default_transport_logger(Arc::new(NoopTransportLogger));
175 }
176}
177
178#[derive(serde::Deserialize)]
179struct GraphQlError {
180 message: String,
181}
182
183#[derive(Default, serde::Deserialize)]
184struct GraphQlEnvelope {
185 data: Option<Value>,
186 #[serde(default)]
187 errors: Vec<GraphQlError>,
188}
189
190#[derive(Clone, Debug, Default)]
198pub struct TransportLogEvent {
199 pub message: &'static str,
201 pub fields: BTreeMap<String, String>,
203 pub headers: Option<Vec<(String, String)>>,
205 pub body: Option<Vec<u8>>,
209}
210
211pub trait TransportLogger: Send + Sync + std::fmt::Debug {
213 fn debug(&self, event: &TransportLogEvent);
215
216 fn enabled(&self) -> bool {
223 true
224 }
225}
226
227#[derive(Clone, Debug, Default)]
229pub struct NoopTransportLogger;
230
231impl TransportLogger for NoopTransportLogger {
232 fn debug(&self, _event: &TransportLogEvent) {}
233
234 fn enabled(&self) -> bool {
235 false
236 }
237}
238
239#[derive(Clone, Debug)]
246pub struct HttpClient {
247 base: reqwest::Client,
248 base_url: String,
249 auth: Arc<dyn AuthInjector>,
250 user_agent: String,
251 default_headers: BTreeMap<String, String>,
252 logger: Arc<dyn TransportLogger>,
253}
254
255#[derive(Clone, Debug)]
257pub struct HttpClientBuilder {
258 base_url: String,
259 auth: Arc<dyn AuthInjector>,
260 user_agent: String,
261 default_headers: BTreeMap<String, String>,
262 logger: Arc<dyn TransportLogger>,
263}
264
265impl HttpClientBuilder {
266 #[must_use]
268 pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
269 Self {
270 base_url: base_url.into(),
271 auth,
272 user_agent: default_user_agent(),
273 default_headers: BTreeMap::new(),
274 logger: default_transport_logger(),
275 }
276 }
277
278 #[must_use]
280 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
281 self.user_agent = user_agent.into();
282 self
283 }
284
285 #[must_use]
287 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Self {
288 self.user_agent(user_agent)
289 }
290
291 #[must_use]
293 pub fn default_headers(mut self, headers: BTreeMap<String, String>) -> Self {
294 self.default_headers = headers;
295 self
296 }
297
298 #[must_use]
300 pub fn with_default_headers(self, headers: BTreeMap<String, String>) -> Self {
301 self.default_headers(headers)
302 }
303
304 #[must_use]
306 pub fn logger(mut self, logger: Arc<dyn TransportLogger>) -> Self {
307 self.logger = logger;
308 self
309 }
310
311 #[must_use]
313 pub fn with_logger(self, logger: Arc<dyn TransportLogger>) -> Self {
314 self.logger(logger)
315 }
316
317 #[must_use]
319 pub fn build(self) -> HttpClient {
320 HttpClient {
321 base: reqwest::Client::new(),
322 base_url: self.base_url,
323 auth: self.auth,
324 user_agent: self.user_agent,
325 default_headers: self.default_headers,
326 logger: self.logger,
327 }
328 }
329}
330
331impl HttpClient {
332 #[must_use]
334 pub fn builder(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> HttpClientBuilder {
335 HttpClientBuilder::new(base_url, auth)
336 }
337
338 #[must_use]
340 pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
341 HttpClientBuilder::new(base_url, auth).build()
342 }
343
344 pub async fn get<T: Default + DeserializeOwned>(&self, path: &str) -> Result<T> {
346 self.do_json(Method::GET, path, Option::<&()>::None).await
347 }
348
349 pub async fn get_without_response(&self, path: &str) -> Result<()> {
351 self.do_empty(Method::GET, path, Option::<&()>::None).await
352 }
353
354 pub async fn post<B: Serialize, T: Default + DeserializeOwned>(
356 &self,
357 path: &str,
358 body: &B,
359 ) -> Result<T> {
360 self.do_json(Method::POST, path, Some(body)).await
361 }
362
363 pub async fn post_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
365 self.do_empty(Method::POST, path, Some(body)).await
366 }
367
368 pub async fn put<B: Serialize, T: Default + DeserializeOwned>(
370 &self,
371 path: &str,
372 body: &B,
373 ) -> Result<T> {
374 self.do_json(Method::PUT, path, Some(body)).await
375 }
376
377 pub async fn put_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
379 self.do_empty(Method::PUT, path, Some(body)).await
380 }
381
382 pub async fn patch<B: Serialize, T: Default + DeserializeOwned>(
384 &self,
385 path: &str,
386 body: &B,
387 ) -> Result<T> {
388 self.do_json(Method::PATCH, path, Some(body)).await
389 }
390
391 pub async fn patch_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
393 self.do_empty(Method::PATCH, path, Some(body)).await
394 }
395
396 pub async fn delete(&self, path: &str) -> Result<()> {
398 self.do_empty(Method::DELETE, path, Option::<&()>::None)
399 .await
400 }
401
402 pub async fn delete_with_body<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
404 self.do_empty(Method::DELETE, path, Some(body)).await
405 }
406
407 pub async fn get_etag<T: Default + DeserializeOwned>(&self, path: &str) -> Result<(T, String)> {
409 let response = self.send_get_status_only_retry(path).await?;
410 let etag = response
411 .headers()
412 .get(header::ETAG)
413 .and_then(|value| value.to_str().ok())
414 .unwrap_or_default()
415 .to_owned();
416 let value = self.decode_json_response(response, "GET", path).await?;
417 Ok((value, etag))
418 }
419
420 pub async fn get_etag_without_response(&self, path: &str) -> Result<String> {
422 let response = self.send_get_status_only_retry(path).await?;
423 let etag = response
424 .headers()
425 .get(header::ETAG)
426 .and_then(|value| value.to_str().ok())
427 .unwrap_or_default()
428 .to_owned();
429 self.ensure_success_response(response, "GET", path).await?;
430 Ok(etag)
431 }
432
433 pub async fn put_if_match<B: Serialize, T: Default + DeserializeOwned>(
435 &self,
436 path: &str,
437 body: &B,
438 etag: &str,
439 ) -> Result<T> {
440 let response = self.send_put_if_match(path, body, etag).await?;
441 self.decode_json_response(response, "PUT", path).await
442 }
443
444 pub async fn put_if_match_without_response<B: Serialize>(
446 &self,
447 path: &str,
448 body: &B,
449 etag: &str,
450 ) -> Result<()> {
451 let response = self.send_put_if_match(path, body, etag).await?;
452 self.ensure_success_response(response, "PUT", path).await
453 }
454
455 pub async fn get_raw(&self, path: &str, writer: &mut dyn Write) -> Result<()> {
457 let response = self.send_get_raw_status_only_retry(path).await?;
458 let (status, bytes) = self
459 .read_and_log_response(response, "GET", path, false)
460 .await?;
461 if status.is_client_error() || status.is_server_error() {
462 return Err(
463 parse_error_body(status, &String::from_utf8_lossy(&bytes), "GET", path).into(),
464 );
465 }
466 writer.write_all(&bytes)?;
467 Ok(())
468 }
469
470 pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
472 let response = self.send_get_raw_status_only_retry(path).await?;
473 let (status, bytes) = self
474 .read_and_log_response(response, "GET", path, false)
475 .await?;
476 if status.is_client_error() || status.is_server_error() {
477 return Err(
478 parse_error_body(status, &String::from_utf8_lossy(&bytes), "GET", path).into(),
479 );
480 }
481 Ok(bytes.to_vec())
482 }
483
484 pub async fn post_raw<B: Serialize>(
486 &self,
487 path: &str,
488 body: Option<&B>,
489 writer: &mut dyn Write,
490 ) -> Result<()> {
491 let response = self.send_post_raw_once(path, body).await?;
492 let (status, bytes) = self
493 .read_and_log_response(response, "POST", path, false)
494 .await?;
495 if status.is_client_error() || status.is_server_error() {
496 return Err(
497 parse_error_body(status, &String::from_utf8_lossy(&bytes), "POST", path).into(),
498 );
499 }
500 writer.write_all(&bytes)?;
501 Ok(())
502 }
503
504 pub async fn do_raw<T: Default + DeserializeOwned>(
506 &self,
507 method: Method,
508 path: &str,
509 content_type: &str,
510 body: impl Into<Vec<u8>>,
511 ) -> Result<T> {
512 self.do_raw_optional_body(method, path, content_type, Some(body.into()))
513 .await
514 }
515
516 pub async fn do_raw_optional_body<T: Default + DeserializeOwned>(
518 &self,
519 method: Method,
520 path: &str,
521 content_type: &str,
522 body: Option<Vec<u8>>,
523 ) -> Result<T> {
524 let method_text = method.as_str().to_owned();
525 let response = self.send_raw_once(method, path, content_type, body).await?;
526 self.decode_json_response(response, &method_text, path)
527 .await
528 }
529
530 pub async fn do_raw_without_response(
532 &self,
533 method: Method,
534 path: &str,
535 content_type: &str,
536 body: impl Into<Vec<u8>>,
537 ) -> Result<()> {
538 self.do_raw_optional_body_without_response(method, path, content_type, Some(body.into()))
539 .await
540 }
541
542 pub async fn do_raw_optional_body_without_response(
544 &self,
545 method: Method,
546 path: &str,
547 content_type: &str,
548 body: Option<Vec<u8>>,
549 ) -> Result<()> {
550 let method_text = method.as_str().to_owned();
551 let response = self.send_raw_once(method, path, content_type, body).await?;
552 self.ensure_success_response(response, &method_text, path)
553 .await
554 }
555
556 pub async fn post_multipart<T: Default + DeserializeOwned>(
558 &self,
559 path: &str,
560 field_name: &str,
561 file_path: &Path,
562 ) -> Result<T> {
563 self.post_multipart_with_fields(path, field_name, file_path, &BTreeMap::new())
564 .await
565 }
566
567 pub async fn post_multipart_without_response(
569 &self,
570 path: &str,
571 field_name: &str,
572 file_path: &Path,
573 ) -> Result<()> {
574 self.post_multipart_with_fields_without_response(
575 path,
576 field_name,
577 file_path,
578 &BTreeMap::new(),
579 )
580 .await
581 }
582
583 pub async fn post_multipart_with_fields<T: Default + DeserializeOwned>(
585 &self,
586 path: &str,
587 file_field: &str,
588 file_path: &Path,
589 fields: &BTreeMap<String, String>,
590 ) -> Result<T> {
591 let form = self.multipart_form(file_field, file_path, fields).await?;
592 self.send_multipart(path, form).await
593 }
594
595 async fn multipart_form(
596 &self,
597 file_field: &str,
598 file_path: &Path,
599 fields: &BTreeMap<String, String>,
600 ) -> Result<reqwest::multipart::Form> {
601 let mut form = reqwest::multipart::Form::new();
602 for (key, value) in fields {
603 form = form.text(key.clone(), value.clone());
604 }
605 let file_name = file_path
606 .file_name()
607 .and_then(|name| name.to_str())
608 .unwrap_or("file")
609 .to_owned();
610 let bytes = tokio::fs::read(file_path)
611 .await
612 .map_err(|err| CliCoreError::message(format!("transport: open file: {err}")))?;
613 let part = reqwest::multipart::Part::bytes(bytes).file_name(file_name);
614 form = form.part(file_field.to_owned(), part);
615 Ok(form)
616 }
617
618 pub async fn post_multipart_with_fields_without_response(
620 &self,
621 path: &str,
622 file_field: &str,
623 file_path: &Path,
624 fields: &BTreeMap<String, String>,
625 ) -> Result<()> {
626 let form = self.multipart_form(file_field, file_path, fields).await?;
627 self.send_multipart_without_response(path, form).await
628 }
629
630 pub async fn post_multipart_fields<T: Default + DeserializeOwned>(
632 &self,
633 path: &str,
634 fields: &BTreeMap<String, String>,
635 ) -> Result<T> {
636 let mut form = reqwest::multipart::Form::new();
637 for (key, value) in fields {
638 form = form.text(key.clone(), value.clone());
639 }
640 self.send_multipart(path, form).await
641 }
642
643 pub async fn post_multipart_fields_without_response(
645 &self,
646 path: &str,
647 fields: &BTreeMap<String, String>,
648 ) -> Result<()> {
649 let mut form = reqwest::multipart::Form::new();
650 for (key, value) in fields {
651 form = form.text(key.clone(), value.clone());
652 }
653 self.send_multipart_without_response(path, form).await
654 }
655
656 pub async fn post_graphql<T: DeserializeOwned + Default>(
658 &self,
659 path: &str,
660 query: &str,
661 variables: BTreeMap<String, Value>,
662 ) -> Result<T> {
663 self.post_graphql_optional_variables(path, query, Some(variables))
664 .await
665 }
666
667 pub async fn post_graphql_optional_variables<T: DeserializeOwned + Default>(
669 &self,
670 path: &str,
671 query: &str,
672 variables: Option<BTreeMap<String, Value>>,
673 ) -> Result<T> {
674 let mut result = T::default();
675 self.post_graphql_optional_variables_into(path, query, variables, &mut result)
676 .await?;
677 Ok(result)
678 }
679
680 pub async fn post_graphql_without_response(
682 &self,
683 path: &str,
684 query: &str,
685 variables: BTreeMap<String, Value>,
686 ) -> Result<()> {
687 self.post_graphql_optional_variables_without_response(path, query, Some(variables))
688 .await
689 }
690
691 pub async fn post_graphql_optional_variables_without_response(
693 &self,
694 path: &str,
695 query: &str,
696 variables: Option<BTreeMap<String, Value>>,
697 ) -> Result<()> {
698 self.post_graphql_response_envelope(path, query, variables)
699 .await?;
700 Ok(())
701 }
702
703 pub async fn post_graphql_into<T: DeserializeOwned>(
705 &self,
706 path: &str,
707 query: &str,
708 variables: BTreeMap<String, Value>,
709 result: &mut T,
710 ) -> Result<()> {
711 self.post_graphql_optional_variables_into(path, query, Some(variables), result)
712 .await
713 }
714
715 pub async fn post_graphql_optional_variables_into<T: DeserializeOwned>(
717 &self,
718 path: &str,
719 query: &str,
720 variables: Option<BTreeMap<String, Value>>,
721 result: &mut T,
722 ) -> Result<()> {
723 let envelope = self
724 .post_graphql_response_envelope(path, query, variables)
725 .await?;
726 if let Some(data) = envelope.data
727 && !data.is_null()
728 {
729 *result = serde_json::from_value(data).map_err(|err| {
730 CliCoreError::message(format!("transport: decode graphql data: {err}"))
731 })?;
732 }
733 Ok(())
734 }
735
736 async fn do_json<B: Serialize, T: Default + DeserializeOwned>(
737 &self,
738 method: Method,
739 path: &str,
740 body: Option<&B>,
741 ) -> Result<T> {
742 let method_text = method.as_str().to_owned();
743 let response = self.send_with_retry(method, path, body).await?;
744 self.decode_json_response(response, &method_text, path)
745 .await
746 }
747
748 async fn post_graphql_response_envelope(
749 &self,
750 path: &str,
751 query: &str,
752 variables: Option<BTreeMap<String, Value>>,
753 ) -> Result<GraphQlEnvelope> {
754 #[derive(Serialize)]
755 struct Request<'query> {
756 query: &'query str,
757 variables: Option<BTreeMap<String, Value>>,
758 }
759
760 let envelope: GraphQlEnvelope = self.post(path, &Request { query, variables }).await?;
761 if !envelope.errors.is_empty() {
762 let message = envelope
763 .errors
764 .iter()
765 .map(|error| error.message.as_str())
766 .collect::<Vec<_>>()
767 .join("; ");
768 return Err(CliCoreError::message(format!("graphql: {message}")));
769 }
770 Ok(envelope)
771 }
772
773 async fn send_put_if_match<B: Serialize>(
774 &self,
775 path: &str,
776 body: &B,
777 etag: &str,
778 ) -> Result<reqwest::Response> {
779 let mut request = self
780 .build_request(Method::PUT, path, Some(body))?
781 .header(header::IF_MATCH, etag)
782 .build()
783 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
784 self.inject_auth(&mut request).await?;
785 self.log_request(&request);
786 self.base
787 .execute(request)
788 .await
789 .map_err(|err| CliCoreError::message(format!("transport: PUT {path}: {err}")))
790 }
791
792 async fn send_multipart<T: Default + DeserializeOwned>(
793 &self,
794 path: &str,
795 form: reqwest::multipart::Form,
796 ) -> Result<T> {
797 let response = self.send_multipart_response(path, form).await?;
798 self.decode_json_response(response, "POST", path).await
799 }
800
801 async fn send_multipart_without_response(
802 &self,
803 path: &str,
804 form: reqwest::multipart::Form,
805 ) -> Result<()> {
806 let response = self.send_multipart_response(path, form).await?;
807 self.ensure_success_response(response, "POST", path).await
808 }
809
810 async fn send_multipart_response(
811 &self,
812 path: &str,
813 form: reqwest::multipart::Form,
814 ) -> Result<reqwest::Response> {
815 let url = format!("{}{}", self.base_url, path);
816 let mut builder = self
817 .base
818 .post(url)
819 .header(header::USER_AGENT, self.user_agent.clone())
820 .multipart(form);
821 for (key, value) in &self.default_headers {
822 builder = builder.header(key, value);
823 }
824 let mut request = builder
825 .build()
826 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
827 self.inject_auth(&mut request).await?;
828 self.log_request(&request);
829 self.base
830 .execute(request)
831 .await
832 .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
833 }
834
835 async fn do_empty<B: Serialize>(
836 &self,
837 method: Method,
838 path: &str,
839 body: Option<&B>,
840 ) -> Result<()> {
841 let method_text = method.as_str().to_owned();
842 let response = self.send_with_retry(method, path, body).await?;
843 self.ensure_success_response(response, &method_text, path)
844 .await
845 }
846
847 async fn send_raw_once(
848 &self,
849 method: Method,
850 path: &str,
851 content_type: &str,
852 body: Option<Vec<u8>>,
853 ) -> Result<reqwest::Response> {
854 let url = format!("{}{}", self.base_url, path);
855 let method_text = method.as_str().to_owned();
856 let mut builder = self
857 .base
858 .request(method, url)
859 .header(header::USER_AGENT, self.user_agent.clone());
860 if let Some(body) = body {
861 builder = builder.body(body);
862 }
863 if !content_type.is_empty() {
864 builder = builder.header(header::CONTENT_TYPE, content_type);
865 }
866 for (key, value) in &self.default_headers {
867 builder = builder.header(key, value);
868 }
869 let mut request = builder
870 .build()
871 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
872 self.inject_auth(&mut request).await?;
873 self.log_request(&request);
874 self.base
875 .execute(request)
876 .await
877 .map_err(|err| CliCoreError::message(format!("transport: {method_text} {path}: {err}")))
878 }
879
880 async fn send_get_raw_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
881 let mut last_err = None;
882 for attempt in 0..MAX_RETRIES {
883 if attempt > 0 {
884 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
885 time::sleep(backoff).await;
886 }
887
888 match self.send_get_raw_once(path).await {
889 Ok(response) => {
890 let status = response.status();
891 if status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() {
892 self.log_response("GET", path, status, response.headers(), None, None);
893 last_err = Some(CliCoreError::message(format!(
894 "transport: GET {}: status {}",
895 path,
896 status.as_u16()
897 )));
898 continue;
899 }
900 return Ok(response);
901 }
902 Err(err) => last_err = Some(err),
903 }
904 }
905 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
906 }
907
908 async fn send_get_raw_once(&self, path: &str) -> Result<reqwest::Response> {
909 let url = format!("{}{}", self.base_url, path);
910 let mut builder = self
911 .base
912 .get(url)
913 .header(header::USER_AGENT, self.user_agent.clone());
914 for (key, value) in &self.default_headers {
915 builder = builder.header(key, value);
916 }
917 let mut request = builder
918 .build()
919 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
920 self.inject_auth(&mut request).await?;
921 self.log_request(&request);
922 self.base
923 .execute(request)
924 .await
925 .map_err(|err| CliCoreError::message(format!("transport: GET {path}: {err}")))
926 }
927
928 async fn send_post_raw_once<B: Serialize>(
929 &self,
930 path: &str,
931 body: Option<&B>,
932 ) -> Result<reqwest::Response> {
933 let mut request = self
934 .build_request(Method::POST, path, body)?
935 .build()
936 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
937 self.inject_auth(&mut request).await?;
938 self.log_request(&request);
939 self.base
940 .execute(request)
941 .await
942 .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
943 }
944
945 async fn send_with_retry<B: Serialize>(
946 &self,
947 method: Method,
948 path: &str,
949 body: Option<&B>,
950 ) -> Result<reqwest::Response> {
951 let mut last_err = None;
952 for attempt in 0..MAX_RETRIES {
953 if attempt > 0 {
954 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
955 self.log_debug(
956 "retrying request",
957 [
958 ("attempt", (attempt + 1).to_string()),
959 ("backoff", format!("{backoff:?}")),
960 ],
961 );
962 time::sleep(backoff).await;
963 }
964
965 match self.send_once(method.clone(), path, body).await {
966 Ok(response) => {
967 if retryable_status(method.clone(), response.status()) {
968 last_err = Some(
969 self.retryable_status_error(response, method.as_str(), path)
970 .await,
971 );
972 continue;
973 }
974 return Ok(response);
975 }
976 Err(err) if is_idempotent(&method) => {
977 last_err = Some(err);
978 }
979 Err(err) => return Err(err),
980 }
981 }
982 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
983 }
984
985 async fn send_get_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
986 let mut last_err = None;
987 for attempt in 0..MAX_RETRIES {
988 if attempt > 0 {
989 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
990 self.log_debug(
991 "retrying request",
992 [
993 ("attempt", (attempt + 1).to_string()),
994 ("backoff", format!("{backoff:?}")),
995 ],
996 );
997 time::sleep(backoff).await;
998 }
999
1000 match self.send_once(Method::GET, path, Option::<&()>::None).await {
1001 Ok(response) => {
1002 let status = response.status();
1003 if status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() {
1004 self.log_response("GET", path, status, response.headers(), None, None);
1005 last_err = Some(CliCoreError::message(format!(
1006 "transport: GET {}: status {}",
1007 path,
1008 status.as_u16()
1009 )));
1010 continue;
1011 }
1012 return Ok(response);
1013 }
1014 Err(err) => last_err = Some(err),
1015 }
1016 }
1017 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
1018 }
1019
1020 async fn send_once<B: Serialize>(
1021 &self,
1022 method: Method,
1023 path: &str,
1024 body: Option<&B>,
1025 ) -> Result<reqwest::Response> {
1026 let mut request = self
1027 .build_request(method.clone(), path, body)?
1028 .build()
1029 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
1030 self.inject_auth(&mut request).await?;
1031 let method_text = method.as_str().to_owned();
1032 self.log_request(&request);
1033 self.base
1034 .execute(request)
1035 .await
1036 .map_err(|err| CliCoreError::message(format!("transport: {method_text} {path}: {err}")))
1037 }
1038
1039 fn build_request<B: Serialize>(
1040 &self,
1041 method: Method,
1042 path: &str,
1043 body: Option<&B>,
1044 ) -> Result<reqwest::RequestBuilder> {
1045 let url = format!("{}{}", self.base_url, path);
1046 let mut builder = self
1047 .base
1048 .request(method, url)
1049 .header(header::USER_AGENT, self.user_agent.clone());
1050 if let Some(body) = body {
1051 let body = serde_json::to_vec(body)
1052 .map_err(|err| CliCoreError::message(format!("transport: marshal body: {err}")))?;
1053 builder = builder
1054 .header(header::CONTENT_TYPE, "application/json")
1055 .body(body);
1056 }
1057 for (key, value) in &self.default_headers {
1058 builder = builder.header(key, value);
1059 }
1060 Ok(builder)
1061 }
1062
1063 fn log_debug(
1064 &self,
1065 message: &'static str,
1066 fields: impl IntoIterator<Item = (&'static str, String)>,
1067 ) {
1068 if !self.logger.enabled() {
1069 return;
1070 }
1071 self.logger.debug(&TransportLogEvent {
1072 message,
1073 fields: fields
1074 .into_iter()
1075 .map(|(key, value)| (key.to_owned(), value))
1076 .collect(),
1077 headers: None,
1078 body: None,
1079 });
1080 }
1081
1082 fn log_request(&self, request: &reqwest::Request) {
1088 if !self.logger.enabled() {
1089 return;
1090 }
1091 self.logger.debug(&TransportLogEvent {
1092 message: "http request",
1093 fields: BTreeMap::from([
1094 ("method".to_owned(), request.method().as_str().to_owned()),
1095 ("url".to_owned(), request.url().as_str().to_owned()),
1096 ]),
1097 headers: Some(header_pairs(request.headers())),
1098 body: request
1099 .body()
1100 .and_then(reqwest::Body::as_bytes)
1101 .map(<[u8]>::to_vec),
1102 });
1103 }
1104
1105 fn log_response(
1109 &self,
1110 method: &str,
1111 path: &str,
1112 status: StatusCode,
1113 headers: &header::HeaderMap,
1114 body: Option<&[u8]>,
1115 body_bytes: Option<usize>,
1116 ) {
1117 if !self.logger.enabled() {
1118 return;
1119 }
1120 let mut fields = BTreeMap::from([
1121 ("status".to_owned(), status.as_u16().to_string()),
1122 ("method".to_owned(), method.to_owned()),
1123 ("url".to_owned(), format!("{}{}", self.base_url, path)),
1124 ]);
1125 if let Some(len) = body_bytes {
1126 fields.insert("body_bytes".to_owned(), len.to_string());
1127 }
1128 self.logger.debug(&TransportLogEvent {
1129 message: "http response",
1130 fields,
1131 headers: Some(header_pairs(headers)),
1132 body: body.map(<[u8]>::to_vec),
1133 });
1134 }
1135
1136 async fn read_and_log_response(
1144 &self,
1145 response: reqwest::Response,
1146 method: &str,
1147 path: &str,
1148 include_body: bool,
1149 ) -> Result<(StatusCode, Bytes)> {
1150 let status = response.status();
1151 let logging = self.logger.enabled();
1152 let headers = logging.then(|| response.headers().clone());
1153 let body = response.bytes().await.map_err(|err| {
1154 CliCoreError::message(format!("transport: read response body: {err}"))
1155 })?;
1156 if let Some(headers) = headers {
1157 if include_body {
1158 self.log_response(method, path, status, &headers, Some(&body), None);
1159 } else {
1160 self.log_response(method, path, status, &headers, None, Some(body.len()));
1161 }
1162 }
1163 Ok((status, body))
1164 }
1165
1166 async fn inject_auth(&self, request: &mut reqwest::Request) -> Result<()> {
1167 self.auth
1168 .inject(request)
1169 .await
1170 .map_err(|err| CliCoreError::message(format!("transport: auth inject: {err}")))
1171 }
1172
1173 async fn decode_json_response<T: Default + DeserializeOwned>(
1174 &self,
1175 response: reqwest::Response,
1176 method: &str,
1177 path: &str,
1178 ) -> Result<T> {
1179 let (status, body) = self
1180 .read_and_log_response(response, method, path, true)
1181 .await?;
1182 if status.is_client_error() || status.is_server_error() {
1183 return Err(
1184 parse_error_body(status, &String::from_utf8_lossy(&body), method, path).into(),
1185 );
1186 }
1187 if status == StatusCode::NO_CONTENT {
1188 return Ok(T::default());
1189 }
1190 if body.trim_ascii() == b"null" {
1191 return Ok(T::default());
1192 }
1193 serde_json::from_slice::<T>(&body)
1194 .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))
1195 }
1196
1197 async fn ensure_success_response(
1198 &self,
1199 response: reqwest::Response,
1200 method: &str,
1201 path: &str,
1202 ) -> Result<()> {
1203 let is_error = response.status().is_client_error() || response.status().is_server_error();
1208 if !is_error && !self.logger.enabled() {
1209 return Ok(());
1210 }
1211 let (status, body) = self
1212 .read_and_log_response(response, method, path, true)
1213 .await?;
1214 if status.is_client_error() || status.is_server_error() {
1215 return Err(
1216 parse_error_body(status, &String::from_utf8_lossy(&body), method, path).into(),
1217 );
1218 }
1219 Ok(())
1220 }
1221
1222 async fn retryable_status_error(
1223 &self,
1224 response: reqwest::Response,
1225 method: &str,
1226 path: &str,
1227 ) -> CliCoreError {
1228 let status = response.status();
1229 let headers = self.logger.enabled().then(|| response.headers().clone());
1230 match response.bytes().await {
1231 Ok(body) => {
1232 if let Some(headers) = &headers {
1233 self.log_response(method, path, status, headers, Some(&body), None);
1234 }
1235 CliCoreError::message(format!(
1236 "transport: {method} {path}: status {}: {}",
1237 status.as_u16(),
1238 String::from_utf8_lossy(&body)
1239 ))
1240 }
1241 Err(err) => {
1242 if let Some(headers) = &headers {
1243 self.log_response(method, path, status, headers, None, None);
1244 }
1245 CliCoreError::message(format!(
1246 "transport: {method} {path}: status {} (body read failed: {err})",
1247 status.as_u16()
1248 ))
1249 }
1250 }
1251 }
1252}
1253
1254fn header_pairs(headers: &header::HeaderMap) -> Vec<(String, String)> {
1259 headers
1260 .iter()
1261 .map(|(name, value)| {
1262 let value = value.to_str().map_or_else(
1263 |_| format!("<{} non-utf8 bytes>", value.as_bytes().len()),
1264 str::to_owned,
1265 );
1266 (name.as_str().to_owned(), value)
1267 })
1268 .collect()
1269}
1270
1271pub async fn parse_error_response(response: reqwest::Response, method: &str, path: &str) -> Error {
1278 let status = response.status();
1279 let body = response.text().await.unwrap_or_default();
1280 parse_error_body(status, &body, method, path)
1281}
1282
1283fn parse_error_body(status: StatusCode, body: &str, method: &str, path: &str) -> Error {
1284 if let Ok(mut api_error) = serde_json::from_str::<Error>(body)
1285 && !api_error.message.is_empty()
1286 {
1287 api_error.code = format!("HTTP_{}", status.as_u16());
1288 return api_error;
1289 }
1290 Error {
1291 code: format!("HTTP_{}", status.as_u16()),
1292 message: format!("{} {}: {} {}", method, path, status.as_u16(), body),
1293 system: String::new(),
1294 request_id: String::new(),
1295 }
1296}
1297
1298fn retryable_status(method: Method, status: StatusCode) -> bool {
1299 status == StatusCode::TOO_MANY_REQUESTS || (status.is_server_error() && is_idempotent(&method))
1300}
1301
1302fn is_idempotent(method: &Method) -> bool {
1303 matches!(*method, Method::GET | Method::HEAD | Method::DELETE)
1304}