1use std::{
2 collections::BTreeMap,
3 io::Write,
4 path::Path,
5 sync::{Arc, OnceLock, RwLock},
6 time::Duration,
7};
8
9use reqwest::{Method, StatusCode, header};
10use serde::{Serialize, de::DeserializeOwned};
11use serde_json::Value;
12use tokio::time;
13
14use super::{AuthInjector, Error};
15use crate::{CliCoreError, Result};
16
17const MAX_RETRIES: usize = 3;
18const BASE_BACKOFF: Duration = Duration::from_millis(500);
19const BUILTIN_DEFAULT_USER_AGENT: &str = "cli/dev";
20static DEFAULT_USER_AGENT: OnceLock<RwLock<String>> = OnceLock::new();
21
22pub fn set_default_user_agent(user_agent: impl Into<String>) {
24 let lock =
25 DEFAULT_USER_AGENT.get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()));
26 if let Ok(mut current) = lock.write() {
27 *current = user_agent.into();
28 }
29}
30
31fn default_user_agent() -> String {
32 DEFAULT_USER_AGENT
33 .get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()))
34 .read()
35 .map_or_else(
36 |_| BUILTIN_DEFAULT_USER_AGENT.to_owned(),
37 |value| value.clone(),
38 )
39}
40
41#[derive(serde::Deserialize)]
42struct GraphQlError {
43 message: String,
44}
45
46#[derive(Default, serde::Deserialize)]
47struct GraphQlEnvelope {
48 data: Option<Value>,
49 #[serde(default)]
50 errors: Vec<GraphQlError>,
51}
52
53#[derive(Clone, Debug)]
55pub struct TransportLogEvent {
56 pub message: &'static str,
58 pub fields: BTreeMap<String, String>,
60}
61
62pub trait TransportLogger: Send + Sync + std::fmt::Debug {
64 fn debug(&self, event: &TransportLogEvent);
66}
67
68#[derive(Clone, Debug, Default)]
70pub struct NoopTransportLogger;
71
72impl TransportLogger for NoopTransportLogger {
73 fn debug(&self, _event: &TransportLogEvent) {}
74}
75
76#[derive(Clone, Debug)]
83pub struct HttpClient {
84 base: reqwest::Client,
85 base_url: String,
86 auth: Arc<dyn AuthInjector>,
87 user_agent: String,
88 default_headers: BTreeMap<String, String>,
89 logger: Arc<dyn TransportLogger>,
90}
91
92#[derive(Clone, Debug)]
94pub struct HttpClientBuilder {
95 base_url: String,
96 auth: Arc<dyn AuthInjector>,
97 user_agent: String,
98 default_headers: BTreeMap<String, String>,
99 logger: Arc<dyn TransportLogger>,
100}
101
102impl HttpClientBuilder {
103 #[must_use]
105 pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
106 Self {
107 base_url: base_url.into(),
108 auth,
109 user_agent: default_user_agent(),
110 default_headers: BTreeMap::new(),
111 logger: Arc::new(NoopTransportLogger),
112 }
113 }
114
115 #[must_use]
117 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
118 self.user_agent = user_agent.into();
119 self
120 }
121
122 #[must_use]
124 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Self {
125 self.user_agent(user_agent)
126 }
127
128 #[must_use]
130 pub fn default_headers(mut self, headers: BTreeMap<String, String>) -> Self {
131 self.default_headers = headers;
132 self
133 }
134
135 #[must_use]
137 pub fn with_default_headers(self, headers: BTreeMap<String, String>) -> Self {
138 self.default_headers(headers)
139 }
140
141 #[must_use]
143 pub fn logger(mut self, logger: Arc<dyn TransportLogger>) -> Self {
144 self.logger = logger;
145 self
146 }
147
148 #[must_use]
150 pub fn with_logger(self, logger: Arc<dyn TransportLogger>) -> Self {
151 self.logger(logger)
152 }
153
154 #[must_use]
156 pub fn build(self) -> HttpClient {
157 HttpClient {
158 base: reqwest::Client::new(),
159 base_url: self.base_url,
160 auth: self.auth,
161 user_agent: self.user_agent,
162 default_headers: self.default_headers,
163 logger: self.logger,
164 }
165 }
166}
167
168impl HttpClient {
169 #[must_use]
171 pub fn builder(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> HttpClientBuilder {
172 HttpClientBuilder::new(base_url, auth)
173 }
174
175 #[must_use]
177 pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
178 HttpClientBuilder::new(base_url, auth).build()
179 }
180
181 pub async fn get<T: Default + DeserializeOwned>(&self, path: &str) -> Result<T> {
183 self.do_json(Method::GET, path, Option::<&()>::None).await
184 }
185
186 pub async fn get_without_response(&self, path: &str) -> Result<()> {
188 self.do_empty(Method::GET, path, Option::<&()>::None).await
189 }
190
191 pub async fn post<B: Serialize, T: Default + DeserializeOwned>(
193 &self,
194 path: &str,
195 body: &B,
196 ) -> Result<T> {
197 self.do_json(Method::POST, path, Some(body)).await
198 }
199
200 pub async fn post_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
202 self.do_empty(Method::POST, path, Some(body)).await
203 }
204
205 pub async fn put<B: Serialize, T: Default + DeserializeOwned>(
207 &self,
208 path: &str,
209 body: &B,
210 ) -> Result<T> {
211 self.do_json(Method::PUT, path, Some(body)).await
212 }
213
214 pub async fn put_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
216 self.do_empty(Method::PUT, path, Some(body)).await
217 }
218
219 pub async fn patch<B: Serialize, T: Default + DeserializeOwned>(
221 &self,
222 path: &str,
223 body: &B,
224 ) -> Result<T> {
225 self.do_json(Method::PATCH, path, Some(body)).await
226 }
227
228 pub async fn patch_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
230 self.do_empty(Method::PATCH, path, Some(body)).await
231 }
232
233 pub async fn delete(&self, path: &str) -> Result<()> {
235 self.do_empty(Method::DELETE, path, Option::<&()>::None)
236 .await
237 }
238
239 pub async fn delete_with_body<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
241 self.do_empty(Method::DELETE, path, Some(body)).await
242 }
243
244 pub async fn get_etag<T: Default + DeserializeOwned>(&self, path: &str) -> Result<(T, String)> {
246 let response = self.send_get_status_only_retry(path).await?;
247 let etag = response
248 .headers()
249 .get(header::ETAG)
250 .and_then(|value| value.to_str().ok())
251 .unwrap_or_default()
252 .to_owned();
253 let value = decode_json_response(response, "GET", path).await?;
254 Ok((value, etag))
255 }
256
257 pub async fn get_etag_without_response(&self, path: &str) -> Result<String> {
259 let response = self.send_get_status_only_retry(path).await?;
260 let etag = response
261 .headers()
262 .get(header::ETAG)
263 .and_then(|value| value.to_str().ok())
264 .unwrap_or_default()
265 .to_owned();
266 ensure_success_response(response, "GET", path).await?;
267 Ok(etag)
268 }
269
270 pub async fn put_if_match<B: Serialize, T: Default + DeserializeOwned>(
272 &self,
273 path: &str,
274 body: &B,
275 etag: &str,
276 ) -> Result<T> {
277 let response = self.send_put_if_match(path, body, etag).await?;
278 decode_json_response(response, "PUT", path).await
279 }
280
281 pub async fn put_if_match_without_response<B: Serialize>(
283 &self,
284 path: &str,
285 body: &B,
286 etag: &str,
287 ) -> Result<()> {
288 let response = self.send_put_if_match(path, body, etag).await?;
289 ensure_success_response(response, "PUT", path).await
290 }
291
292 pub async fn get_raw(&self, path: &str, writer: &mut dyn Write) -> Result<()> {
294 let response = self.send_get_raw_status_only_retry(path).await?;
295 if response.status().is_client_error() || response.status().is_server_error() {
296 return Err(parse_error_response(response, "GET", path).await.into());
297 }
298 let bytes = response
299 .bytes()
300 .await
301 .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
302 writer.write_all(&bytes)?;
303 Ok(())
304 }
305
306 pub async fn post_raw<B: Serialize>(
308 &self,
309 path: &str,
310 body: Option<&B>,
311 writer: &mut dyn Write,
312 ) -> Result<()> {
313 let response = self.send_post_raw_once(path, body).await?;
314 if response.status().is_client_error() || response.status().is_server_error() {
315 return Err(parse_error_response(response, "POST", path).await.into());
316 }
317 let bytes = response
318 .bytes()
319 .await
320 .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
321 writer.write_all(&bytes)?;
322 Ok(())
323 }
324
325 pub async fn do_raw<T: Default + DeserializeOwned>(
327 &self,
328 method: Method,
329 path: &str,
330 content_type: &str,
331 body: impl Into<Vec<u8>>,
332 ) -> Result<T> {
333 self.do_raw_optional_body(method, path, content_type, Some(body.into()))
334 .await
335 }
336
337 pub async fn do_raw_optional_body<T: Default + DeserializeOwned>(
339 &self,
340 method: Method,
341 path: &str,
342 content_type: &str,
343 body: Option<Vec<u8>>,
344 ) -> Result<T> {
345 let method_text = method.as_str().to_owned();
346 let response = self.send_raw_once(method, path, content_type, body).await?;
347 decode_json_response(response, &method_text, path).await
348 }
349
350 pub async fn do_raw_without_response(
352 &self,
353 method: Method,
354 path: &str,
355 content_type: &str,
356 body: impl Into<Vec<u8>>,
357 ) -> Result<()> {
358 self.do_raw_optional_body_without_response(method, path, content_type, Some(body.into()))
359 .await
360 }
361
362 pub async fn do_raw_optional_body_without_response(
364 &self,
365 method: Method,
366 path: &str,
367 content_type: &str,
368 body: Option<Vec<u8>>,
369 ) -> Result<()> {
370 let method_text = method.as_str().to_owned();
371 let response = self.send_raw_once(method, path, content_type, body).await?;
372 ensure_success_response(response, &method_text, path).await
373 }
374
375 pub async fn post_multipart<T: Default + DeserializeOwned>(
377 &self,
378 path: &str,
379 field_name: &str,
380 file_path: &Path,
381 ) -> Result<T> {
382 self.post_multipart_with_fields(path, field_name, file_path, &BTreeMap::new())
383 .await
384 }
385
386 pub async fn post_multipart_without_response(
388 &self,
389 path: &str,
390 field_name: &str,
391 file_path: &Path,
392 ) -> Result<()> {
393 self.post_multipart_with_fields_without_response(
394 path,
395 field_name,
396 file_path,
397 &BTreeMap::new(),
398 )
399 .await
400 }
401
402 pub async fn post_multipart_with_fields<T: Default + DeserializeOwned>(
404 &self,
405 path: &str,
406 file_field: &str,
407 file_path: &Path,
408 fields: &BTreeMap<String, String>,
409 ) -> Result<T> {
410 let form = self.multipart_form(file_field, file_path, fields).await?;
411 self.send_multipart(path, form).await
412 }
413
414 async fn multipart_form(
415 &self,
416 file_field: &str,
417 file_path: &Path,
418 fields: &BTreeMap<String, String>,
419 ) -> Result<reqwest::multipart::Form> {
420 let mut form = reqwest::multipart::Form::new();
421 for (key, value) in fields {
422 form = form.text(key.clone(), value.clone());
423 }
424 let file_name = file_path
425 .file_name()
426 .and_then(|name| name.to_str())
427 .unwrap_or("file")
428 .to_owned();
429 let bytes = tokio::fs::read(file_path)
430 .await
431 .map_err(|err| CliCoreError::message(format!("transport: open file: {err}")))?;
432 let part = reqwest::multipart::Part::bytes(bytes).file_name(file_name);
433 form = form.part(file_field.to_owned(), part);
434 Ok(form)
435 }
436
437 pub async fn post_multipart_with_fields_without_response(
439 &self,
440 path: &str,
441 file_field: &str,
442 file_path: &Path,
443 fields: &BTreeMap<String, String>,
444 ) -> Result<()> {
445 let form = self.multipart_form(file_field, file_path, fields).await?;
446 self.send_multipart_without_response(path, form).await
447 }
448
449 pub async fn post_multipart_fields<T: Default + DeserializeOwned>(
451 &self,
452 path: &str,
453 fields: &BTreeMap<String, String>,
454 ) -> Result<T> {
455 let mut form = reqwest::multipart::Form::new();
456 for (key, value) in fields {
457 form = form.text(key.clone(), value.clone());
458 }
459 self.send_multipart(path, form).await
460 }
461
462 pub async fn post_multipart_fields_without_response(
464 &self,
465 path: &str,
466 fields: &BTreeMap<String, String>,
467 ) -> Result<()> {
468 let mut form = reqwest::multipart::Form::new();
469 for (key, value) in fields {
470 form = form.text(key.clone(), value.clone());
471 }
472 self.send_multipart_without_response(path, form).await
473 }
474
475 pub async fn post_graphql<T: DeserializeOwned + Default>(
477 &self,
478 path: &str,
479 query: &str,
480 variables: BTreeMap<String, Value>,
481 ) -> Result<T> {
482 self.post_graphql_optional_variables(path, query, Some(variables))
483 .await
484 }
485
486 pub async fn post_graphql_optional_variables<T: DeserializeOwned + Default>(
488 &self,
489 path: &str,
490 query: &str,
491 variables: Option<BTreeMap<String, Value>>,
492 ) -> Result<T> {
493 let mut result = T::default();
494 self.post_graphql_optional_variables_into(path, query, variables, &mut result)
495 .await?;
496 Ok(result)
497 }
498
499 pub async fn post_graphql_without_response(
501 &self,
502 path: &str,
503 query: &str,
504 variables: BTreeMap<String, Value>,
505 ) -> Result<()> {
506 self.post_graphql_optional_variables_without_response(path, query, Some(variables))
507 .await
508 }
509
510 pub async fn post_graphql_optional_variables_without_response(
512 &self,
513 path: &str,
514 query: &str,
515 variables: Option<BTreeMap<String, Value>>,
516 ) -> Result<()> {
517 self.post_graphql_response_envelope(path, query, variables)
518 .await?;
519 Ok(())
520 }
521
522 pub async fn post_graphql_into<T: DeserializeOwned>(
524 &self,
525 path: &str,
526 query: &str,
527 variables: BTreeMap<String, Value>,
528 result: &mut T,
529 ) -> Result<()> {
530 self.post_graphql_optional_variables_into(path, query, Some(variables), result)
531 .await
532 }
533
534 pub async fn post_graphql_optional_variables_into<T: DeserializeOwned>(
536 &self,
537 path: &str,
538 query: &str,
539 variables: Option<BTreeMap<String, Value>>,
540 result: &mut T,
541 ) -> Result<()> {
542 let envelope = self
543 .post_graphql_response_envelope(path, query, variables)
544 .await?;
545 if let Some(data) = envelope.data
546 && !data.is_null()
547 {
548 *result = serde_json::from_value(data).map_err(|err| {
549 CliCoreError::message(format!("transport: decode graphql data: {err}"))
550 })?;
551 }
552 Ok(())
553 }
554
555 async fn do_json<B: Serialize, T: Default + DeserializeOwned>(
556 &self,
557 method: Method,
558 path: &str,
559 body: Option<&B>,
560 ) -> Result<T> {
561 let method_text = method.as_str().to_owned();
562 let response = self.send_with_retry(method, path, body).await?;
563 decode_json_response(response, &method_text, path).await
564 }
565
566 async fn post_graphql_response_envelope(
567 &self,
568 path: &str,
569 query: &str,
570 variables: Option<BTreeMap<String, Value>>,
571 ) -> Result<GraphQlEnvelope> {
572 #[derive(Serialize)]
573 struct Request<'query> {
574 query: &'query str,
575 variables: Option<BTreeMap<String, Value>>,
576 }
577
578 let envelope: GraphQlEnvelope = self.post(path, &Request { query, variables }).await?;
579 if !envelope.errors.is_empty() {
580 let message = envelope
581 .errors
582 .iter()
583 .map(|error| error.message.as_str())
584 .collect::<Vec<_>>()
585 .join("; ");
586 return Err(CliCoreError::message(format!("graphql: {message}")));
587 }
588 Ok(envelope)
589 }
590
591 async fn send_put_if_match<B: Serialize>(
592 &self,
593 path: &str,
594 body: &B,
595 etag: &str,
596 ) -> Result<reqwest::Response> {
597 let mut request = self
598 .build_request(Method::PUT, path, Some(body))?
599 .header(header::IF_MATCH, etag)
600 .build()
601 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
602 self.inject_auth(&mut request).await?;
603 let url = format!("{}{}", self.base_url, path);
604 self.log_debug(
605 "http request",
606 [("method", "PUT".to_owned()), ("url", url.clone())],
607 );
608 let response = self
609 .base
610 .execute(request)
611 .await
612 .map_err(|err| CliCoreError::message(format!("transport: PUT {path}: {err}")))?;
613 self.log_debug(
614 "http response",
615 [
616 ("status", response.status().as_u16().to_string()),
617 ("method", "PUT".to_owned()),
618 ("url", url),
619 ],
620 );
621 Ok(response)
622 }
623
624 async fn send_multipart<T: Default + DeserializeOwned>(
625 &self,
626 path: &str,
627 form: reqwest::multipart::Form,
628 ) -> Result<T> {
629 let response = self.send_multipart_response(path, form).await?;
630 decode_json_response(response, "POST", path).await
631 }
632
633 async fn send_multipart_without_response(
634 &self,
635 path: &str,
636 form: reqwest::multipart::Form,
637 ) -> Result<()> {
638 let response = self.send_multipart_response(path, form).await?;
639 ensure_success_response(response, "POST", path).await
640 }
641
642 async fn send_multipart_response(
643 &self,
644 path: &str,
645 form: reqwest::multipart::Form,
646 ) -> Result<reqwest::Response> {
647 let url = format!("{}{}", self.base_url, path);
648 let mut builder = self
649 .base
650 .post(url.clone())
651 .header(header::USER_AGENT, self.user_agent.clone())
652 .multipart(form);
653 for (key, value) in &self.default_headers {
654 builder = builder.header(key, value);
655 }
656 let mut request = builder
657 .build()
658 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
659 self.inject_auth(&mut request).await?;
660 self.log_debug("http multipart request", [("url", url)]);
661 self.base
662 .execute(request)
663 .await
664 .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
665 }
666
667 async fn do_empty<B: Serialize>(
668 &self,
669 method: Method,
670 path: &str,
671 body: Option<&B>,
672 ) -> Result<()> {
673 let method_text = method.as_str().to_owned();
674 let response = self.send_with_retry(method, path, body).await?;
675 ensure_success_response(response, &method_text, path).await
676 }
677
678 async fn send_raw_once(
679 &self,
680 method: Method,
681 path: &str,
682 content_type: &str,
683 body: Option<Vec<u8>>,
684 ) -> Result<reqwest::Response> {
685 let url = format!("{}{}", self.base_url, path);
686 let method_text = method.as_str().to_owned();
687 let mut builder = self
688 .base
689 .request(method, url)
690 .header(header::USER_AGENT, self.user_agent.clone());
691 if let Some(body) = body {
692 builder = builder.body(body);
693 }
694 if !content_type.is_empty() {
695 builder = builder.header(header::CONTENT_TYPE, content_type);
696 }
697 for (key, value) in &self.default_headers {
698 builder = builder.header(key, value);
699 }
700 let mut request = builder
701 .build()
702 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
703 self.inject_auth(&mut request).await?;
704 self.log_debug(
705 "http request",
706 [
707 ("method", method_text.clone()),
708 ("url", format!("{}{}", self.base_url, path)),
709 ],
710 );
711 self.base
712 .execute(request)
713 .await
714 .map_err(|err| CliCoreError::message(format!("transport: {method_text} {path}: {err}")))
715 }
716
717 async fn send_get_raw_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
718 let mut last_err = None;
719 for attempt in 0..MAX_RETRIES {
720 if attempt > 0 {
721 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
722 time::sleep(backoff).await;
723 }
724
725 match self.send_get_raw_once(path).await {
726 Ok(response) => {
727 if response.status() == StatusCode::TOO_MANY_REQUESTS
728 || response.status().is_server_error()
729 {
730 last_err = Some(CliCoreError::message(format!(
731 "transport: GET {}: status {}",
732 path,
733 response.status().as_u16()
734 )));
735 continue;
736 }
737 return Ok(response);
738 }
739 Err(err) => last_err = Some(err),
740 }
741 }
742 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
743 }
744
745 async fn send_get_raw_once(&self, path: &str) -> Result<reqwest::Response> {
746 let url = format!("{}{}", self.base_url, path);
747 let mut builder = self
748 .base
749 .get(url.clone())
750 .header(header::USER_AGENT, self.user_agent.clone());
751 for (key, value) in &self.default_headers {
752 builder = builder.header(key, value);
753 }
754 let mut request = builder
755 .build()
756 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
757 self.inject_auth(&mut request).await?;
758 self.log_debug("http raw request", [("url", url)]);
759 self.base
760 .execute(request)
761 .await
762 .map_err(|err| CliCoreError::message(format!("transport: GET {path}: {err}")))
763 }
764
765 async fn send_post_raw_once<B: Serialize>(
766 &self,
767 path: &str,
768 body: Option<&B>,
769 ) -> Result<reqwest::Response> {
770 let mut request = self
771 .build_request(Method::POST, path, body)?
772 .build()
773 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
774 self.inject_auth(&mut request).await?;
775 self.log_debug(
776 "http post raw request",
777 [("url", format!("{}{}", self.base_url, path))],
778 );
779 self.base
780 .execute(request)
781 .await
782 .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
783 }
784
785 async fn send_with_retry<B: Serialize>(
786 &self,
787 method: Method,
788 path: &str,
789 body: Option<&B>,
790 ) -> Result<reqwest::Response> {
791 let mut last_err = None;
792 for attempt in 0..MAX_RETRIES {
793 if attempt > 0 {
794 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
795 self.log_debug(
796 "retrying request",
797 [
798 ("attempt", (attempt + 1).to_string()),
799 ("backoff", format!("{backoff:?}")),
800 ],
801 );
802 time::sleep(backoff).await;
803 }
804
805 match self.send_once(method.clone(), path, body).await {
806 Ok(response) => {
807 if retryable_status(method.clone(), response.status()) {
808 last_err =
809 Some(retryable_status_error(response, method.as_str(), path).await);
810 continue;
811 }
812 return Ok(response);
813 }
814 Err(err) if is_idempotent(&method) => {
815 last_err = Some(err);
816 }
817 Err(err) => return Err(err),
818 }
819 }
820 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
821 }
822
823 async fn send_get_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
824 let mut last_err = None;
825 for attempt in 0..MAX_RETRIES {
826 if attempt > 0 {
827 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
828 self.log_debug(
829 "retrying request",
830 [
831 ("attempt", (attempt + 1).to_string()),
832 ("backoff", format!("{backoff:?}")),
833 ],
834 );
835 time::sleep(backoff).await;
836 }
837
838 match self.send_once(Method::GET, path, Option::<&()>::None).await {
839 Ok(response) => {
840 if response.status() == StatusCode::TOO_MANY_REQUESTS
841 || response.status().is_server_error()
842 {
843 last_err = Some(CliCoreError::message(format!(
844 "transport: GET {}: status {}",
845 path,
846 response.status().as_u16()
847 )));
848 continue;
849 }
850 return Ok(response);
851 }
852 Err(err) => last_err = Some(err),
853 }
854 }
855 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
856 }
857
858 async fn send_once<B: Serialize>(
859 &self,
860 method: Method,
861 path: &str,
862 body: Option<&B>,
863 ) -> Result<reqwest::Response> {
864 let mut request = self
865 .build_request(method.clone(), path, body)?
866 .build()
867 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
868 self.inject_auth(&mut request).await?;
869 let method_text = method.as_str().to_owned();
870 self.log_debug(
871 "http request",
872 [
873 ("method", method_text.clone()),
874 ("url", format!("{}{}", self.base_url, path)),
875 ],
876 );
877 let response = self.base.execute(request).await.map_err(|err| {
878 CliCoreError::message(format!("transport: {method_text} {path}: {err}"))
879 })?;
880 self.log_debug(
881 "http response",
882 [
883 ("status", response.status().as_u16().to_string()),
884 ("method", method_text),
885 ("url", format!("{}{}", self.base_url, path)),
886 ],
887 );
888 Ok(response)
889 }
890
891 fn build_request<B: Serialize>(
892 &self,
893 method: Method,
894 path: &str,
895 body: Option<&B>,
896 ) -> Result<reqwest::RequestBuilder> {
897 let url = format!("{}{}", self.base_url, path);
898 let mut builder = self
899 .base
900 .request(method, url)
901 .header(header::USER_AGENT, self.user_agent.clone());
902 if let Some(body) = body {
903 let body = serde_json::to_vec(body)
904 .map_err(|err| CliCoreError::message(format!("transport: marshal body: {err}")))?;
905 builder = builder
906 .header(header::CONTENT_TYPE, "application/json")
907 .body(body);
908 }
909 for (key, value) in &self.default_headers {
910 builder = builder.header(key, value);
911 }
912 Ok(builder)
913 }
914
915 fn log_debug(
916 &self,
917 message: &'static str,
918 fields: impl IntoIterator<Item = (&'static str, String)>,
919 ) {
920 self.logger.debug(&TransportLogEvent {
921 message,
922 fields: fields
923 .into_iter()
924 .map(|(key, value)| (key.to_owned(), value))
925 .collect(),
926 });
927 }
928
929 async fn inject_auth(&self, request: &mut reqwest::Request) -> Result<()> {
930 self.auth
931 .inject(request)
932 .await
933 .map_err(|err| CliCoreError::message(format!("transport: auth inject: {err}")))
934 }
935}
936
937async fn decode_json_response<T: Default + DeserializeOwned>(
938 response: reqwest::Response,
939 method: &str,
940 path: &str,
941) -> Result<T> {
942 if response.status().is_client_error() || response.status().is_server_error() {
943 return Err(parse_error_response(response, method, path).await.into());
944 }
945 if response.status() == StatusCode::NO_CONTENT {
946 return Ok(T::default());
947 }
948 let body = response
949 .bytes()
950 .await
951 .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))?;
952 if body.trim_ascii() == b"null" {
953 return Ok(T::default());
954 }
955 serde_json::from_slice::<T>(&body)
956 .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))
957}
958
959async fn ensure_success_response(
960 response: reqwest::Response,
961 method: &str,
962 path: &str,
963) -> Result<()> {
964 if response.status().is_client_error() || response.status().is_server_error() {
965 return Err(parse_error_response(response, method, path).await.into());
966 }
967 Ok(())
968}
969
970pub async fn parse_error_response(response: reqwest::Response, method: &str, path: &str) -> Error {
977 let status = response.status();
978 let body = response.text().await.unwrap_or_default();
979 parse_error_body(status, &body, method, path)
980}
981
982fn parse_error_body(status: StatusCode, body: &str, method: &str, path: &str) -> Error {
983 if let Ok(mut api_error) = serde_json::from_str::<Error>(body)
984 && !api_error.message.is_empty()
985 {
986 api_error.code = format!("HTTP_{}", status.as_u16());
987 return api_error;
988 }
989 Error {
990 code: format!("HTTP_{}", status.as_u16()),
991 message: format!("{} {}: {} {}", method, path, status.as_u16(), body),
992 system: String::new(),
993 request_id: String::new(),
994 }
995}
996
997fn retryable_status(method: Method, status: StatusCode) -> bool {
998 status == StatusCode::TOO_MANY_REQUESTS || (status.is_server_error() && is_idempotent(&method))
999}
1000
1001async fn retryable_status_error(
1002 response: reqwest::Response,
1003 method: &str,
1004 path: &str,
1005) -> CliCoreError {
1006 let status = response.status().as_u16();
1007 match response.text().await {
1008 Ok(body) => CliCoreError::message(format!(
1009 "transport: {method} {path}: status {status}: {body}"
1010 )),
1011 Err(err) => CliCoreError::message(format!(
1012 "transport: {method} {path}: status {status} (body read failed: {err})"
1013 )),
1014 }
1015}
1016
1017fn is_idempotent(method: &Method) -> bool {
1018 matches!(*method, Method::GET | Method::HEAD | Method::DELETE)
1019}