1use std::{
2 collections::BTreeMap,
3 io::Write,
4 path::Path,
5 sync::{Arc, OnceLock, RwLock},
6 time::Duration,
7};
8
9use reqwest::{Method, StatusCode, header};
10use serde::{Serialize, de::DeserializeOwned};
11use serde_json::Value;
12use tokio::time;
13
14use super::{AuthInjector, Error};
15use crate::{CliCoreError, Result};
16
17const MAX_RETRIES: usize = 3;
18const BASE_BACKOFF: Duration = Duration::from_millis(500);
19const BUILTIN_DEFAULT_USER_AGENT: &str = "cli/dev";
20static DEFAULT_USER_AGENT: OnceLock<RwLock<String>> = OnceLock::new();
21
22pub fn set_default_user_agent(user_agent: impl Into<String>) {
30 let lock =
31 DEFAULT_USER_AGENT.get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()));
32 if let Ok(mut current) = lock.write() {
33 *current = user_agent.into();
34 }
35}
36
37pub(crate) fn default_user_agent() -> String {
43 DEFAULT_USER_AGENT
44 .get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()))
45 .read()
46 .map_or_else(
47 |_| BUILTIN_DEFAULT_USER_AGENT.to_owned(),
48 |value| value.clone(),
49 )
50}
51
52#[cfg(test)]
56pub(crate) static UA_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
57
58#[cfg(test)]
63pub(crate) struct RestoreDefaultUserAgent;
64
65#[cfg(test)]
66impl Drop for RestoreDefaultUserAgent {
67 fn drop(&mut self) {
68 set_default_user_agent(BUILTIN_DEFAULT_USER_AGENT);
69 }
70}
71
72#[derive(serde::Deserialize)]
73struct GraphQlError {
74 message: String,
75}
76
77#[derive(Default, serde::Deserialize)]
78struct GraphQlEnvelope {
79 data: Option<Value>,
80 #[serde(default)]
81 errors: Vec<GraphQlError>,
82}
83
84#[derive(Clone, Debug)]
86pub struct TransportLogEvent {
87 pub message: &'static str,
89 pub fields: BTreeMap<String, String>,
91}
92
93pub trait TransportLogger: Send + Sync + std::fmt::Debug {
95 fn debug(&self, event: &TransportLogEvent);
97}
98
99#[derive(Clone, Debug, Default)]
101pub struct NoopTransportLogger;
102
103impl TransportLogger for NoopTransportLogger {
104 fn debug(&self, _event: &TransportLogEvent) {}
105}
106
107#[derive(Clone, Debug)]
114pub struct HttpClient {
115 base: reqwest::Client,
116 base_url: String,
117 auth: Arc<dyn AuthInjector>,
118 user_agent: String,
119 default_headers: BTreeMap<String, String>,
120 logger: Arc<dyn TransportLogger>,
121}
122
123#[derive(Clone, Debug)]
125pub struct HttpClientBuilder {
126 base_url: String,
127 auth: Arc<dyn AuthInjector>,
128 user_agent: String,
129 default_headers: BTreeMap<String, String>,
130 logger: Arc<dyn TransportLogger>,
131}
132
133impl HttpClientBuilder {
134 #[must_use]
136 pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
137 Self {
138 base_url: base_url.into(),
139 auth,
140 user_agent: default_user_agent(),
141 default_headers: BTreeMap::new(),
142 logger: Arc::new(NoopTransportLogger),
143 }
144 }
145
146 #[must_use]
148 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
149 self.user_agent = user_agent.into();
150 self
151 }
152
153 #[must_use]
155 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Self {
156 self.user_agent(user_agent)
157 }
158
159 #[must_use]
161 pub fn default_headers(mut self, headers: BTreeMap<String, String>) -> Self {
162 self.default_headers = headers;
163 self
164 }
165
166 #[must_use]
168 pub fn with_default_headers(self, headers: BTreeMap<String, String>) -> Self {
169 self.default_headers(headers)
170 }
171
172 #[must_use]
174 pub fn logger(mut self, logger: Arc<dyn TransportLogger>) -> Self {
175 self.logger = logger;
176 self
177 }
178
179 #[must_use]
181 pub fn with_logger(self, logger: Arc<dyn TransportLogger>) -> Self {
182 self.logger(logger)
183 }
184
185 #[must_use]
187 pub fn build(self) -> HttpClient {
188 HttpClient {
189 base: reqwest::Client::new(),
190 base_url: self.base_url,
191 auth: self.auth,
192 user_agent: self.user_agent,
193 default_headers: self.default_headers,
194 logger: self.logger,
195 }
196 }
197}
198
199impl HttpClient {
200 #[must_use]
202 pub fn builder(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> HttpClientBuilder {
203 HttpClientBuilder::new(base_url, auth)
204 }
205
206 #[must_use]
208 pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
209 HttpClientBuilder::new(base_url, auth).build()
210 }
211
212 pub async fn get<T: Default + DeserializeOwned>(&self, path: &str) -> Result<T> {
214 self.do_json(Method::GET, path, Option::<&()>::None).await
215 }
216
217 pub async fn get_without_response(&self, path: &str) -> Result<()> {
219 self.do_empty(Method::GET, path, Option::<&()>::None).await
220 }
221
222 pub async fn post<B: Serialize, T: Default + DeserializeOwned>(
224 &self,
225 path: &str,
226 body: &B,
227 ) -> Result<T> {
228 self.do_json(Method::POST, path, Some(body)).await
229 }
230
231 pub async fn post_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
233 self.do_empty(Method::POST, path, Some(body)).await
234 }
235
236 pub async fn put<B: Serialize, T: Default + DeserializeOwned>(
238 &self,
239 path: &str,
240 body: &B,
241 ) -> Result<T> {
242 self.do_json(Method::PUT, path, Some(body)).await
243 }
244
245 pub async fn put_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
247 self.do_empty(Method::PUT, path, Some(body)).await
248 }
249
250 pub async fn patch<B: Serialize, T: Default + DeserializeOwned>(
252 &self,
253 path: &str,
254 body: &B,
255 ) -> Result<T> {
256 self.do_json(Method::PATCH, path, Some(body)).await
257 }
258
259 pub async fn patch_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
261 self.do_empty(Method::PATCH, path, Some(body)).await
262 }
263
264 pub async fn delete(&self, path: &str) -> Result<()> {
266 self.do_empty(Method::DELETE, path, Option::<&()>::None)
267 .await
268 }
269
270 pub async fn delete_with_body<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
272 self.do_empty(Method::DELETE, path, Some(body)).await
273 }
274
275 pub async fn get_etag<T: Default + DeserializeOwned>(&self, path: &str) -> Result<(T, String)> {
277 let response = self.send_get_status_only_retry(path).await?;
278 let etag = response
279 .headers()
280 .get(header::ETAG)
281 .and_then(|value| value.to_str().ok())
282 .unwrap_or_default()
283 .to_owned();
284 let value = decode_json_response(response, "GET", path).await?;
285 Ok((value, etag))
286 }
287
288 pub async fn get_etag_without_response(&self, path: &str) -> Result<String> {
290 let response = self.send_get_status_only_retry(path).await?;
291 let etag = response
292 .headers()
293 .get(header::ETAG)
294 .and_then(|value| value.to_str().ok())
295 .unwrap_or_default()
296 .to_owned();
297 ensure_success_response(response, "GET", path).await?;
298 Ok(etag)
299 }
300
301 pub async fn put_if_match<B: Serialize, T: Default + DeserializeOwned>(
303 &self,
304 path: &str,
305 body: &B,
306 etag: &str,
307 ) -> Result<T> {
308 let response = self.send_put_if_match(path, body, etag).await?;
309 decode_json_response(response, "PUT", path).await
310 }
311
312 pub async fn put_if_match_without_response<B: Serialize>(
314 &self,
315 path: &str,
316 body: &B,
317 etag: &str,
318 ) -> Result<()> {
319 let response = self.send_put_if_match(path, body, etag).await?;
320 ensure_success_response(response, "PUT", path).await
321 }
322
323 pub async fn get_raw(&self, path: &str, writer: &mut dyn Write) -> Result<()> {
325 let response = self.send_get_raw_status_only_retry(path).await?;
326 if response.status().is_client_error() || response.status().is_server_error() {
327 return Err(parse_error_response(response, "GET", path).await.into());
328 }
329 let bytes = response
330 .bytes()
331 .await
332 .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
333 writer.write_all(&bytes)?;
334 Ok(())
335 }
336
337 pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
339 let response = self.send_get_raw_status_only_retry(path).await?;
340 if response.status().is_client_error() || response.status().is_server_error() {
341 return Err(parse_error_response(response, "GET", path).await.into());
342 }
343 let bytes = response
344 .bytes()
345 .await
346 .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
347 Ok(bytes.to_vec())
348 }
349
350 pub async fn post_raw<B: Serialize>(
352 &self,
353 path: &str,
354 body: Option<&B>,
355 writer: &mut dyn Write,
356 ) -> Result<()> {
357 let response = self.send_post_raw_once(path, body).await?;
358 if response.status().is_client_error() || response.status().is_server_error() {
359 return Err(parse_error_response(response, "POST", path).await.into());
360 }
361 let bytes = response
362 .bytes()
363 .await
364 .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
365 writer.write_all(&bytes)?;
366 Ok(())
367 }
368
369 pub async fn do_raw<T: Default + DeserializeOwned>(
371 &self,
372 method: Method,
373 path: &str,
374 content_type: &str,
375 body: impl Into<Vec<u8>>,
376 ) -> Result<T> {
377 self.do_raw_optional_body(method, path, content_type, Some(body.into()))
378 .await
379 }
380
381 pub async fn do_raw_optional_body<T: Default + DeserializeOwned>(
383 &self,
384 method: Method,
385 path: &str,
386 content_type: &str,
387 body: Option<Vec<u8>>,
388 ) -> Result<T> {
389 let method_text = method.as_str().to_owned();
390 let response = self.send_raw_once(method, path, content_type, body).await?;
391 decode_json_response(response, &method_text, path).await
392 }
393
394 pub async fn do_raw_without_response(
396 &self,
397 method: Method,
398 path: &str,
399 content_type: &str,
400 body: impl Into<Vec<u8>>,
401 ) -> Result<()> {
402 self.do_raw_optional_body_without_response(method, path, content_type, Some(body.into()))
403 .await
404 }
405
406 pub async fn do_raw_optional_body_without_response(
408 &self,
409 method: Method,
410 path: &str,
411 content_type: &str,
412 body: Option<Vec<u8>>,
413 ) -> Result<()> {
414 let method_text = method.as_str().to_owned();
415 let response = self.send_raw_once(method, path, content_type, body).await?;
416 ensure_success_response(response, &method_text, path).await
417 }
418
419 pub async fn post_multipart<T: Default + DeserializeOwned>(
421 &self,
422 path: &str,
423 field_name: &str,
424 file_path: &Path,
425 ) -> Result<T> {
426 self.post_multipart_with_fields(path, field_name, file_path, &BTreeMap::new())
427 .await
428 }
429
430 pub async fn post_multipart_without_response(
432 &self,
433 path: &str,
434 field_name: &str,
435 file_path: &Path,
436 ) -> Result<()> {
437 self.post_multipart_with_fields_without_response(
438 path,
439 field_name,
440 file_path,
441 &BTreeMap::new(),
442 )
443 .await
444 }
445
446 pub async fn post_multipart_with_fields<T: Default + DeserializeOwned>(
448 &self,
449 path: &str,
450 file_field: &str,
451 file_path: &Path,
452 fields: &BTreeMap<String, String>,
453 ) -> Result<T> {
454 let form = self.multipart_form(file_field, file_path, fields).await?;
455 self.send_multipart(path, form).await
456 }
457
458 async fn multipart_form(
459 &self,
460 file_field: &str,
461 file_path: &Path,
462 fields: &BTreeMap<String, String>,
463 ) -> Result<reqwest::multipart::Form> {
464 let mut form = reqwest::multipart::Form::new();
465 for (key, value) in fields {
466 form = form.text(key.clone(), value.clone());
467 }
468 let file_name = file_path
469 .file_name()
470 .and_then(|name| name.to_str())
471 .unwrap_or("file")
472 .to_owned();
473 let bytes = tokio::fs::read(file_path)
474 .await
475 .map_err(|err| CliCoreError::message(format!("transport: open file: {err}")))?;
476 let part = reqwest::multipart::Part::bytes(bytes).file_name(file_name);
477 form = form.part(file_field.to_owned(), part);
478 Ok(form)
479 }
480
481 pub async fn post_multipart_with_fields_without_response(
483 &self,
484 path: &str,
485 file_field: &str,
486 file_path: &Path,
487 fields: &BTreeMap<String, String>,
488 ) -> Result<()> {
489 let form = self.multipart_form(file_field, file_path, fields).await?;
490 self.send_multipart_without_response(path, form).await
491 }
492
493 pub async fn post_multipart_fields<T: Default + DeserializeOwned>(
495 &self,
496 path: &str,
497 fields: &BTreeMap<String, String>,
498 ) -> Result<T> {
499 let mut form = reqwest::multipart::Form::new();
500 for (key, value) in fields {
501 form = form.text(key.clone(), value.clone());
502 }
503 self.send_multipart(path, form).await
504 }
505
506 pub async fn post_multipart_fields_without_response(
508 &self,
509 path: &str,
510 fields: &BTreeMap<String, String>,
511 ) -> Result<()> {
512 let mut form = reqwest::multipart::Form::new();
513 for (key, value) in fields {
514 form = form.text(key.clone(), value.clone());
515 }
516 self.send_multipart_without_response(path, form).await
517 }
518
519 pub async fn post_graphql<T: DeserializeOwned + Default>(
521 &self,
522 path: &str,
523 query: &str,
524 variables: BTreeMap<String, Value>,
525 ) -> Result<T> {
526 self.post_graphql_optional_variables(path, query, Some(variables))
527 .await
528 }
529
530 pub async fn post_graphql_optional_variables<T: DeserializeOwned + Default>(
532 &self,
533 path: &str,
534 query: &str,
535 variables: Option<BTreeMap<String, Value>>,
536 ) -> Result<T> {
537 let mut result = T::default();
538 self.post_graphql_optional_variables_into(path, query, variables, &mut result)
539 .await?;
540 Ok(result)
541 }
542
543 pub async fn post_graphql_without_response(
545 &self,
546 path: &str,
547 query: &str,
548 variables: BTreeMap<String, Value>,
549 ) -> Result<()> {
550 self.post_graphql_optional_variables_without_response(path, query, Some(variables))
551 .await
552 }
553
554 pub async fn post_graphql_optional_variables_without_response(
556 &self,
557 path: &str,
558 query: &str,
559 variables: Option<BTreeMap<String, Value>>,
560 ) -> Result<()> {
561 self.post_graphql_response_envelope(path, query, variables)
562 .await?;
563 Ok(())
564 }
565
566 pub async fn post_graphql_into<T: DeserializeOwned>(
568 &self,
569 path: &str,
570 query: &str,
571 variables: BTreeMap<String, Value>,
572 result: &mut T,
573 ) -> Result<()> {
574 self.post_graphql_optional_variables_into(path, query, Some(variables), result)
575 .await
576 }
577
578 pub async fn post_graphql_optional_variables_into<T: DeserializeOwned>(
580 &self,
581 path: &str,
582 query: &str,
583 variables: Option<BTreeMap<String, Value>>,
584 result: &mut T,
585 ) -> Result<()> {
586 let envelope = self
587 .post_graphql_response_envelope(path, query, variables)
588 .await?;
589 if let Some(data) = envelope.data
590 && !data.is_null()
591 {
592 *result = serde_json::from_value(data).map_err(|err| {
593 CliCoreError::message(format!("transport: decode graphql data: {err}"))
594 })?;
595 }
596 Ok(())
597 }
598
599 async fn do_json<B: Serialize, T: Default + DeserializeOwned>(
600 &self,
601 method: Method,
602 path: &str,
603 body: Option<&B>,
604 ) -> Result<T> {
605 let method_text = method.as_str().to_owned();
606 let response = self.send_with_retry(method, path, body).await?;
607 decode_json_response(response, &method_text, path).await
608 }
609
610 async fn post_graphql_response_envelope(
611 &self,
612 path: &str,
613 query: &str,
614 variables: Option<BTreeMap<String, Value>>,
615 ) -> Result<GraphQlEnvelope> {
616 #[derive(Serialize)]
617 struct Request<'query> {
618 query: &'query str,
619 variables: Option<BTreeMap<String, Value>>,
620 }
621
622 let envelope: GraphQlEnvelope = self.post(path, &Request { query, variables }).await?;
623 if !envelope.errors.is_empty() {
624 let message = envelope
625 .errors
626 .iter()
627 .map(|error| error.message.as_str())
628 .collect::<Vec<_>>()
629 .join("; ");
630 return Err(CliCoreError::message(format!("graphql: {message}")));
631 }
632 Ok(envelope)
633 }
634
635 async fn send_put_if_match<B: Serialize>(
636 &self,
637 path: &str,
638 body: &B,
639 etag: &str,
640 ) -> Result<reqwest::Response> {
641 let mut request = self
642 .build_request(Method::PUT, path, Some(body))?
643 .header(header::IF_MATCH, etag)
644 .build()
645 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
646 self.inject_auth(&mut request).await?;
647 let url = format!("{}{}", self.base_url, path);
648 self.log_debug(
649 "http request",
650 [("method", "PUT".to_owned()), ("url", url.clone())],
651 );
652 let response = self
653 .base
654 .execute(request)
655 .await
656 .map_err(|err| CliCoreError::message(format!("transport: PUT {path}: {err}")))?;
657 self.log_debug(
658 "http response",
659 [
660 ("status", response.status().as_u16().to_string()),
661 ("method", "PUT".to_owned()),
662 ("url", url),
663 ],
664 );
665 Ok(response)
666 }
667
668 async fn send_multipart<T: Default + DeserializeOwned>(
669 &self,
670 path: &str,
671 form: reqwest::multipart::Form,
672 ) -> Result<T> {
673 let response = self.send_multipart_response(path, form).await?;
674 decode_json_response(response, "POST", path).await
675 }
676
677 async fn send_multipart_without_response(
678 &self,
679 path: &str,
680 form: reqwest::multipart::Form,
681 ) -> Result<()> {
682 let response = self.send_multipart_response(path, form).await?;
683 ensure_success_response(response, "POST", path).await
684 }
685
686 async fn send_multipart_response(
687 &self,
688 path: &str,
689 form: reqwest::multipart::Form,
690 ) -> Result<reqwest::Response> {
691 let url = format!("{}{}", self.base_url, path);
692 let mut builder = self
693 .base
694 .post(url.clone())
695 .header(header::USER_AGENT, self.user_agent.clone())
696 .multipart(form);
697 for (key, value) in &self.default_headers {
698 builder = builder.header(key, value);
699 }
700 let mut request = builder
701 .build()
702 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
703 self.inject_auth(&mut request).await?;
704 self.log_debug("http multipart request", [("url", url)]);
705 self.base
706 .execute(request)
707 .await
708 .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
709 }
710
711 async fn do_empty<B: Serialize>(
712 &self,
713 method: Method,
714 path: &str,
715 body: Option<&B>,
716 ) -> Result<()> {
717 let method_text = method.as_str().to_owned();
718 let response = self.send_with_retry(method, path, body).await?;
719 ensure_success_response(response, &method_text, path).await
720 }
721
722 async fn send_raw_once(
723 &self,
724 method: Method,
725 path: &str,
726 content_type: &str,
727 body: Option<Vec<u8>>,
728 ) -> Result<reqwest::Response> {
729 let url = format!("{}{}", self.base_url, path);
730 let method_text = method.as_str().to_owned();
731 let mut builder = self
732 .base
733 .request(method, url)
734 .header(header::USER_AGENT, self.user_agent.clone());
735 if let Some(body) = body {
736 builder = builder.body(body);
737 }
738 if !content_type.is_empty() {
739 builder = builder.header(header::CONTENT_TYPE, content_type);
740 }
741 for (key, value) in &self.default_headers {
742 builder = builder.header(key, value);
743 }
744 let mut request = builder
745 .build()
746 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
747 self.inject_auth(&mut request).await?;
748 self.log_debug(
749 "http request",
750 [
751 ("method", method_text.clone()),
752 ("url", format!("{}{}", self.base_url, path)),
753 ],
754 );
755 self.base
756 .execute(request)
757 .await
758 .map_err(|err| CliCoreError::message(format!("transport: {method_text} {path}: {err}")))
759 }
760
761 async fn send_get_raw_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
762 let mut last_err = None;
763 for attempt in 0..MAX_RETRIES {
764 if attempt > 0 {
765 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
766 time::sleep(backoff).await;
767 }
768
769 match self.send_get_raw_once(path).await {
770 Ok(response) => {
771 if response.status() == StatusCode::TOO_MANY_REQUESTS
772 || response.status().is_server_error()
773 {
774 last_err = Some(CliCoreError::message(format!(
775 "transport: GET {}: status {}",
776 path,
777 response.status().as_u16()
778 )));
779 continue;
780 }
781 return Ok(response);
782 }
783 Err(err) => last_err = Some(err),
784 }
785 }
786 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
787 }
788
789 async fn send_get_raw_once(&self, path: &str) -> Result<reqwest::Response> {
790 let url = format!("{}{}", self.base_url, path);
791 let mut builder = self
792 .base
793 .get(url.clone())
794 .header(header::USER_AGENT, self.user_agent.clone());
795 for (key, value) in &self.default_headers {
796 builder = builder.header(key, value);
797 }
798 let mut request = builder
799 .build()
800 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
801 self.inject_auth(&mut request).await?;
802 self.log_debug("http raw request", [("url", url)]);
803 self.base
804 .execute(request)
805 .await
806 .map_err(|err| CliCoreError::message(format!("transport: GET {path}: {err}")))
807 }
808
809 async fn send_post_raw_once<B: Serialize>(
810 &self,
811 path: &str,
812 body: Option<&B>,
813 ) -> Result<reqwest::Response> {
814 let mut request = self
815 .build_request(Method::POST, path, body)?
816 .build()
817 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
818 self.inject_auth(&mut request).await?;
819 self.log_debug(
820 "http post raw request",
821 [("url", format!("{}{}", self.base_url, path))],
822 );
823 self.base
824 .execute(request)
825 .await
826 .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
827 }
828
829 async fn send_with_retry<B: Serialize>(
830 &self,
831 method: Method,
832 path: &str,
833 body: Option<&B>,
834 ) -> Result<reqwest::Response> {
835 let mut last_err = None;
836 for attempt in 0..MAX_RETRIES {
837 if attempt > 0 {
838 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
839 self.log_debug(
840 "retrying request",
841 [
842 ("attempt", (attempt + 1).to_string()),
843 ("backoff", format!("{backoff:?}")),
844 ],
845 );
846 time::sleep(backoff).await;
847 }
848
849 match self.send_once(method.clone(), path, body).await {
850 Ok(response) => {
851 if retryable_status(method.clone(), response.status()) {
852 last_err =
853 Some(retryable_status_error(response, method.as_str(), path).await);
854 continue;
855 }
856 return Ok(response);
857 }
858 Err(err) if is_idempotent(&method) => {
859 last_err = Some(err);
860 }
861 Err(err) => return Err(err),
862 }
863 }
864 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
865 }
866
867 async fn send_get_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
868 let mut last_err = None;
869 for attempt in 0..MAX_RETRIES {
870 if attempt > 0 {
871 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
872 self.log_debug(
873 "retrying request",
874 [
875 ("attempt", (attempt + 1).to_string()),
876 ("backoff", format!("{backoff:?}")),
877 ],
878 );
879 time::sleep(backoff).await;
880 }
881
882 match self.send_once(Method::GET, path, Option::<&()>::None).await {
883 Ok(response) => {
884 if response.status() == StatusCode::TOO_MANY_REQUESTS
885 || response.status().is_server_error()
886 {
887 last_err = Some(CliCoreError::message(format!(
888 "transport: GET {}: status {}",
889 path,
890 response.status().as_u16()
891 )));
892 continue;
893 }
894 return Ok(response);
895 }
896 Err(err) => last_err = Some(err),
897 }
898 }
899 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
900 }
901
902 async fn send_once<B: Serialize>(
903 &self,
904 method: Method,
905 path: &str,
906 body: Option<&B>,
907 ) -> Result<reqwest::Response> {
908 let mut request = self
909 .build_request(method.clone(), path, body)?
910 .build()
911 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
912 self.inject_auth(&mut request).await?;
913 let method_text = method.as_str().to_owned();
914 self.log_debug(
915 "http request",
916 [
917 ("method", method_text.clone()),
918 ("url", format!("{}{}", self.base_url, path)),
919 ],
920 );
921 let response = self.base.execute(request).await.map_err(|err| {
922 CliCoreError::message(format!("transport: {method_text} {path}: {err}"))
923 })?;
924 self.log_debug(
925 "http response",
926 [
927 ("status", response.status().as_u16().to_string()),
928 ("method", method_text),
929 ("url", format!("{}{}", self.base_url, path)),
930 ],
931 );
932 Ok(response)
933 }
934
935 fn build_request<B: Serialize>(
936 &self,
937 method: Method,
938 path: &str,
939 body: Option<&B>,
940 ) -> Result<reqwest::RequestBuilder> {
941 let url = format!("{}{}", self.base_url, path);
942 let mut builder = self
943 .base
944 .request(method, url)
945 .header(header::USER_AGENT, self.user_agent.clone());
946 if let Some(body) = body {
947 let body = serde_json::to_vec(body)
948 .map_err(|err| CliCoreError::message(format!("transport: marshal body: {err}")))?;
949 builder = builder
950 .header(header::CONTENT_TYPE, "application/json")
951 .body(body);
952 }
953 for (key, value) in &self.default_headers {
954 builder = builder.header(key, value);
955 }
956 Ok(builder)
957 }
958
959 fn log_debug(
960 &self,
961 message: &'static str,
962 fields: impl IntoIterator<Item = (&'static str, String)>,
963 ) {
964 self.logger.debug(&TransportLogEvent {
965 message,
966 fields: fields
967 .into_iter()
968 .map(|(key, value)| (key.to_owned(), value))
969 .collect(),
970 });
971 }
972
973 async fn inject_auth(&self, request: &mut reqwest::Request) -> Result<()> {
974 self.auth
975 .inject(request)
976 .await
977 .map_err(|err| CliCoreError::message(format!("transport: auth inject: {err}")))
978 }
979}
980
981async fn decode_json_response<T: Default + DeserializeOwned>(
982 response: reqwest::Response,
983 method: &str,
984 path: &str,
985) -> Result<T> {
986 if response.status().is_client_error() || response.status().is_server_error() {
987 return Err(parse_error_response(response, method, path).await.into());
988 }
989 if response.status() == StatusCode::NO_CONTENT {
990 return Ok(T::default());
991 }
992 let body = response
993 .bytes()
994 .await
995 .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))?;
996 if body.trim_ascii() == b"null" {
997 return Ok(T::default());
998 }
999 serde_json::from_slice::<T>(&body)
1000 .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))
1001}
1002
1003async fn ensure_success_response(
1004 response: reqwest::Response,
1005 method: &str,
1006 path: &str,
1007) -> Result<()> {
1008 if response.status().is_client_error() || response.status().is_server_error() {
1009 return Err(parse_error_response(response, method, path).await.into());
1010 }
1011 Ok(())
1012}
1013
1014pub async fn parse_error_response(response: reqwest::Response, method: &str, path: &str) -> Error {
1021 let status = response.status();
1022 let body = response.text().await.unwrap_or_default();
1023 parse_error_body(status, &body, method, path)
1024}
1025
1026fn parse_error_body(status: StatusCode, body: &str, method: &str, path: &str) -> Error {
1027 if let Ok(mut api_error) = serde_json::from_str::<Error>(body)
1028 && !api_error.message.is_empty()
1029 {
1030 api_error.code = format!("HTTP_{}", status.as_u16());
1031 return api_error;
1032 }
1033 Error {
1034 code: format!("HTTP_{}", status.as_u16()),
1035 message: format!("{} {}: {} {}", method, path, status.as_u16(), body),
1036 system: String::new(),
1037 request_id: String::new(),
1038 }
1039}
1040
1041fn retryable_status(method: Method, status: StatusCode) -> bool {
1042 status == StatusCode::TOO_MANY_REQUESTS || (status.is_server_error() && is_idempotent(&method))
1043}
1044
1045async fn retryable_status_error(
1046 response: reqwest::Response,
1047 method: &str,
1048 path: &str,
1049) -> CliCoreError {
1050 let status = response.status().as_u16();
1051 match response.text().await {
1052 Ok(body) => CliCoreError::message(format!(
1053 "transport: {method} {path}: status {status}: {body}"
1054 )),
1055 Err(err) => CliCoreError::message(format!(
1056 "transport: {method} {path}: status {status} (body read failed: {err})"
1057 )),
1058 }
1059}
1060
1061fn is_idempotent(method: &Method) -> bool {
1062 matches!(*method, Method::GET | Method::HEAD | Method::DELETE)
1063}