1use std::{
2 collections::BTreeMap,
3 io::Write,
4 path::Path,
5 sync::{Arc, OnceLock, RwLock},
6 time::Duration,
7};
8
9use reqwest::{Method, StatusCode, header};
10use serde::{Serialize, de::DeserializeOwned};
11use serde_json::Value;
12use tokio::time;
13
14use super::{AuthInjector, Error};
15use crate::{CliCoreError, Result};
16
17const MAX_RETRIES: usize = 3;
18const BASE_BACKOFF: Duration = Duration::from_millis(500);
19const BUILTIN_DEFAULT_USER_AGENT: &str = "cli/dev";
20static DEFAULT_USER_AGENT: OnceLock<RwLock<String>> = OnceLock::new();
21
22pub fn set_default_user_agent(user_agent: impl Into<String>) {
24 let lock =
25 DEFAULT_USER_AGENT.get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()));
26 if let Ok(mut current) = lock.write() {
27 *current = user_agent.into();
28 }
29}
30
31fn default_user_agent() -> String {
32 DEFAULT_USER_AGENT
33 .get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()))
34 .read()
35 .map_or_else(
36 |_| BUILTIN_DEFAULT_USER_AGENT.to_owned(),
37 |value| value.clone(),
38 )
39}
40
41#[derive(serde::Deserialize)]
42struct GraphQlError {
43 message: String,
44}
45
46#[derive(Default, serde::Deserialize)]
47struct GraphQlEnvelope {
48 data: Option<Value>,
49 #[serde(default)]
50 errors: Vec<GraphQlError>,
51}
52
53#[derive(Clone, Debug)]
55pub struct TransportLogEvent {
56 pub message: &'static str,
58 pub fields: BTreeMap<String, String>,
60}
61
62pub trait TransportLogger: Send + Sync + std::fmt::Debug {
64 fn debug(&self, event: &TransportLogEvent);
66}
67
68#[derive(Clone, Debug, Default)]
70pub struct NoopTransportLogger;
71
72impl TransportLogger for NoopTransportLogger {
73 fn debug(&self, _event: &TransportLogEvent) {}
74}
75
76#[derive(Clone, Debug)]
83pub struct HttpClient {
84 base: reqwest::Client,
85 base_url: String,
86 auth: Arc<dyn AuthInjector>,
87 user_agent: String,
88 default_headers: BTreeMap<String, String>,
89 logger: Arc<dyn TransportLogger>,
90}
91
92#[derive(Clone, Debug)]
94pub struct HttpClientBuilder {
95 base_url: String,
96 auth: Arc<dyn AuthInjector>,
97 user_agent: String,
98 default_headers: BTreeMap<String, String>,
99 logger: Arc<dyn TransportLogger>,
100}
101
102impl HttpClientBuilder {
103 #[must_use]
105 pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
106 Self {
107 base_url: base_url.into(),
108 auth,
109 user_agent: default_user_agent(),
110 default_headers: BTreeMap::new(),
111 logger: Arc::new(NoopTransportLogger),
112 }
113 }
114
115 #[must_use]
117 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
118 self.user_agent = user_agent.into();
119 self
120 }
121
122 #[must_use]
124 pub fn with_user_agent(self, user_agent: impl Into<String>) -> Self {
125 self.user_agent(user_agent)
126 }
127
128 #[must_use]
130 pub fn default_headers(mut self, headers: BTreeMap<String, String>) -> Self {
131 self.default_headers = headers;
132 self
133 }
134
135 #[must_use]
137 pub fn with_default_headers(self, headers: BTreeMap<String, String>) -> Self {
138 self.default_headers(headers)
139 }
140
141 #[must_use]
143 pub fn logger(mut self, logger: Arc<dyn TransportLogger>) -> Self {
144 self.logger = logger;
145 self
146 }
147
148 #[must_use]
150 pub fn with_logger(self, logger: Arc<dyn TransportLogger>) -> Self {
151 self.logger(logger)
152 }
153
154 #[must_use]
156 pub fn build(self) -> HttpClient {
157 HttpClient {
158 base: reqwest::Client::new(),
159 base_url: self.base_url,
160 auth: self.auth,
161 user_agent: self.user_agent,
162 default_headers: self.default_headers,
163 logger: self.logger,
164 }
165 }
166}
167
168impl HttpClient {
169 #[must_use]
171 pub fn builder(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> HttpClientBuilder {
172 HttpClientBuilder::new(base_url, auth)
173 }
174
175 #[must_use]
177 pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
178 HttpClientBuilder::new(base_url, auth).build()
179 }
180
181 pub async fn get<T: Default + DeserializeOwned>(&self, path: &str) -> Result<T> {
183 self.do_json(Method::GET, path, Option::<&()>::None).await
184 }
185
186 pub async fn get_without_response(&self, path: &str) -> Result<()> {
188 self.do_empty(Method::GET, path, Option::<&()>::None).await
189 }
190
191 pub async fn post<B: Serialize, T: Default + DeserializeOwned>(
193 &self,
194 path: &str,
195 body: &B,
196 ) -> Result<T> {
197 self.do_json(Method::POST, path, Some(body)).await
198 }
199
200 pub async fn post_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
202 self.do_empty(Method::POST, path, Some(body)).await
203 }
204
205 pub async fn put<B: Serialize, T: Default + DeserializeOwned>(
207 &self,
208 path: &str,
209 body: &B,
210 ) -> Result<T> {
211 self.do_json(Method::PUT, path, Some(body)).await
212 }
213
214 pub async fn put_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
216 self.do_empty(Method::PUT, path, Some(body)).await
217 }
218
219 pub async fn patch<B: Serialize, T: Default + DeserializeOwned>(
221 &self,
222 path: &str,
223 body: &B,
224 ) -> Result<T> {
225 self.do_json(Method::PATCH, path, Some(body)).await
226 }
227
228 pub async fn patch_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
230 self.do_empty(Method::PATCH, path, Some(body)).await
231 }
232
233 pub async fn delete(&self, path: &str) -> Result<()> {
235 self.do_empty(Method::DELETE, path, Option::<&()>::None)
236 .await
237 }
238
239 pub async fn delete_with_body<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
241 self.do_empty(Method::DELETE, path, Some(body)).await
242 }
243
244 pub async fn get_etag<T: Default + DeserializeOwned>(&self, path: &str) -> Result<(T, String)> {
246 let response = self.send_get_status_only_retry(path).await?;
247 let etag = response
248 .headers()
249 .get(header::ETAG)
250 .and_then(|value| value.to_str().ok())
251 .unwrap_or_default()
252 .to_owned();
253 let value = decode_json_response(response, "GET", path).await?;
254 Ok((value, etag))
255 }
256
257 pub async fn get_etag_without_response(&self, path: &str) -> Result<String> {
259 let response = self.send_get_status_only_retry(path).await?;
260 let etag = response
261 .headers()
262 .get(header::ETAG)
263 .and_then(|value| value.to_str().ok())
264 .unwrap_or_default()
265 .to_owned();
266 ensure_success_response(response, "GET", path).await?;
267 Ok(etag)
268 }
269
270 pub async fn put_if_match<B: Serialize, T: Default + DeserializeOwned>(
272 &self,
273 path: &str,
274 body: &B,
275 etag: &str,
276 ) -> Result<T> {
277 let response = self.send_put_if_match(path, body, etag).await?;
278 decode_json_response(response, "PUT", path).await
279 }
280
281 pub async fn put_if_match_without_response<B: Serialize>(
283 &self,
284 path: &str,
285 body: &B,
286 etag: &str,
287 ) -> Result<()> {
288 let response = self.send_put_if_match(path, body, etag).await?;
289 ensure_success_response(response, "PUT", path).await
290 }
291
292 pub async fn get_raw(&self, path: &str, writer: &mut dyn Write) -> Result<()> {
294 let response = self.send_get_raw_status_only_retry(path).await?;
295 if response.status().is_client_error() || response.status().is_server_error() {
296 return Err(parse_error_response(response, "GET", path).await.into());
297 }
298 let bytes = response
299 .bytes()
300 .await
301 .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
302 writer.write_all(&bytes)?;
303 Ok(())
304 }
305
306 pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
308 let response = self.send_get_raw_status_only_retry(path).await?;
309 if response.status().is_client_error() || response.status().is_server_error() {
310 return Err(parse_error_response(response, "GET", path).await.into());
311 }
312 let bytes = response
313 .bytes()
314 .await
315 .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
316 Ok(bytes.to_vec())
317 }
318
319 pub async fn post_raw<B: Serialize>(
321 &self,
322 path: &str,
323 body: Option<&B>,
324 writer: &mut dyn Write,
325 ) -> Result<()> {
326 let response = self.send_post_raw_once(path, body).await?;
327 if response.status().is_client_error() || response.status().is_server_error() {
328 return Err(parse_error_response(response, "POST", path).await.into());
329 }
330 let bytes = response
331 .bytes()
332 .await
333 .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
334 writer.write_all(&bytes)?;
335 Ok(())
336 }
337
338 pub async fn do_raw<T: Default + DeserializeOwned>(
340 &self,
341 method: Method,
342 path: &str,
343 content_type: &str,
344 body: impl Into<Vec<u8>>,
345 ) -> Result<T> {
346 self.do_raw_optional_body(method, path, content_type, Some(body.into()))
347 .await
348 }
349
350 pub async fn do_raw_optional_body<T: Default + DeserializeOwned>(
352 &self,
353 method: Method,
354 path: &str,
355 content_type: &str,
356 body: Option<Vec<u8>>,
357 ) -> Result<T> {
358 let method_text = method.as_str().to_owned();
359 let response = self.send_raw_once(method, path, content_type, body).await?;
360 decode_json_response(response, &method_text, path).await
361 }
362
363 pub async fn do_raw_without_response(
365 &self,
366 method: Method,
367 path: &str,
368 content_type: &str,
369 body: impl Into<Vec<u8>>,
370 ) -> Result<()> {
371 self.do_raw_optional_body_without_response(method, path, content_type, Some(body.into()))
372 .await
373 }
374
375 pub async fn do_raw_optional_body_without_response(
377 &self,
378 method: Method,
379 path: &str,
380 content_type: &str,
381 body: Option<Vec<u8>>,
382 ) -> Result<()> {
383 let method_text = method.as_str().to_owned();
384 let response = self.send_raw_once(method, path, content_type, body).await?;
385 ensure_success_response(response, &method_text, path).await
386 }
387
388 pub async fn post_multipart<T: Default + DeserializeOwned>(
390 &self,
391 path: &str,
392 field_name: &str,
393 file_path: &Path,
394 ) -> Result<T> {
395 self.post_multipart_with_fields(path, field_name, file_path, &BTreeMap::new())
396 .await
397 }
398
399 pub async fn post_multipart_without_response(
401 &self,
402 path: &str,
403 field_name: &str,
404 file_path: &Path,
405 ) -> Result<()> {
406 self.post_multipart_with_fields_without_response(
407 path,
408 field_name,
409 file_path,
410 &BTreeMap::new(),
411 )
412 .await
413 }
414
415 pub async fn post_multipart_with_fields<T: Default + DeserializeOwned>(
417 &self,
418 path: &str,
419 file_field: &str,
420 file_path: &Path,
421 fields: &BTreeMap<String, String>,
422 ) -> Result<T> {
423 let form = self.multipart_form(file_field, file_path, fields).await?;
424 self.send_multipart(path, form).await
425 }
426
427 async fn multipart_form(
428 &self,
429 file_field: &str,
430 file_path: &Path,
431 fields: &BTreeMap<String, String>,
432 ) -> Result<reqwest::multipart::Form> {
433 let mut form = reqwest::multipart::Form::new();
434 for (key, value) in fields {
435 form = form.text(key.clone(), value.clone());
436 }
437 let file_name = file_path
438 .file_name()
439 .and_then(|name| name.to_str())
440 .unwrap_or("file")
441 .to_owned();
442 let bytes = tokio::fs::read(file_path)
443 .await
444 .map_err(|err| CliCoreError::message(format!("transport: open file: {err}")))?;
445 let part = reqwest::multipart::Part::bytes(bytes).file_name(file_name);
446 form = form.part(file_field.to_owned(), part);
447 Ok(form)
448 }
449
450 pub async fn post_multipart_with_fields_without_response(
452 &self,
453 path: &str,
454 file_field: &str,
455 file_path: &Path,
456 fields: &BTreeMap<String, String>,
457 ) -> Result<()> {
458 let form = self.multipart_form(file_field, file_path, fields).await?;
459 self.send_multipart_without_response(path, form).await
460 }
461
462 pub async fn post_multipart_fields<T: Default + DeserializeOwned>(
464 &self,
465 path: &str,
466 fields: &BTreeMap<String, String>,
467 ) -> Result<T> {
468 let mut form = reqwest::multipart::Form::new();
469 for (key, value) in fields {
470 form = form.text(key.clone(), value.clone());
471 }
472 self.send_multipart(path, form).await
473 }
474
475 pub async fn post_multipart_fields_without_response(
477 &self,
478 path: &str,
479 fields: &BTreeMap<String, String>,
480 ) -> Result<()> {
481 let mut form = reqwest::multipart::Form::new();
482 for (key, value) in fields {
483 form = form.text(key.clone(), value.clone());
484 }
485 self.send_multipart_without_response(path, form).await
486 }
487
488 pub async fn post_graphql<T: DeserializeOwned + Default>(
490 &self,
491 path: &str,
492 query: &str,
493 variables: BTreeMap<String, Value>,
494 ) -> Result<T> {
495 self.post_graphql_optional_variables(path, query, Some(variables))
496 .await
497 }
498
499 pub async fn post_graphql_optional_variables<T: DeserializeOwned + Default>(
501 &self,
502 path: &str,
503 query: &str,
504 variables: Option<BTreeMap<String, Value>>,
505 ) -> Result<T> {
506 let mut result = T::default();
507 self.post_graphql_optional_variables_into(path, query, variables, &mut result)
508 .await?;
509 Ok(result)
510 }
511
512 pub async fn post_graphql_without_response(
514 &self,
515 path: &str,
516 query: &str,
517 variables: BTreeMap<String, Value>,
518 ) -> Result<()> {
519 self.post_graphql_optional_variables_without_response(path, query, Some(variables))
520 .await
521 }
522
523 pub async fn post_graphql_optional_variables_without_response(
525 &self,
526 path: &str,
527 query: &str,
528 variables: Option<BTreeMap<String, Value>>,
529 ) -> Result<()> {
530 self.post_graphql_response_envelope(path, query, variables)
531 .await?;
532 Ok(())
533 }
534
535 pub async fn post_graphql_into<T: DeserializeOwned>(
537 &self,
538 path: &str,
539 query: &str,
540 variables: BTreeMap<String, Value>,
541 result: &mut T,
542 ) -> Result<()> {
543 self.post_graphql_optional_variables_into(path, query, Some(variables), result)
544 .await
545 }
546
547 pub async fn post_graphql_optional_variables_into<T: DeserializeOwned>(
549 &self,
550 path: &str,
551 query: &str,
552 variables: Option<BTreeMap<String, Value>>,
553 result: &mut T,
554 ) -> Result<()> {
555 let envelope = self
556 .post_graphql_response_envelope(path, query, variables)
557 .await?;
558 if let Some(data) = envelope.data
559 && !data.is_null()
560 {
561 *result = serde_json::from_value(data).map_err(|err| {
562 CliCoreError::message(format!("transport: decode graphql data: {err}"))
563 })?;
564 }
565 Ok(())
566 }
567
568 async fn do_json<B: Serialize, T: Default + DeserializeOwned>(
569 &self,
570 method: Method,
571 path: &str,
572 body: Option<&B>,
573 ) -> Result<T> {
574 let method_text = method.as_str().to_owned();
575 let response = self.send_with_retry(method, path, body).await?;
576 decode_json_response(response, &method_text, path).await
577 }
578
579 async fn post_graphql_response_envelope(
580 &self,
581 path: &str,
582 query: &str,
583 variables: Option<BTreeMap<String, Value>>,
584 ) -> Result<GraphQlEnvelope> {
585 #[derive(Serialize)]
586 struct Request<'query> {
587 query: &'query str,
588 variables: Option<BTreeMap<String, Value>>,
589 }
590
591 let envelope: GraphQlEnvelope = self.post(path, &Request { query, variables }).await?;
592 if !envelope.errors.is_empty() {
593 let message = envelope
594 .errors
595 .iter()
596 .map(|error| error.message.as_str())
597 .collect::<Vec<_>>()
598 .join("; ");
599 return Err(CliCoreError::message(format!("graphql: {message}")));
600 }
601 Ok(envelope)
602 }
603
604 async fn send_put_if_match<B: Serialize>(
605 &self,
606 path: &str,
607 body: &B,
608 etag: &str,
609 ) -> Result<reqwest::Response> {
610 let mut request = self
611 .build_request(Method::PUT, path, Some(body))?
612 .header(header::IF_MATCH, etag)
613 .build()
614 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
615 self.inject_auth(&mut request).await?;
616 let url = format!("{}{}", self.base_url, path);
617 self.log_debug(
618 "http request",
619 [("method", "PUT".to_owned()), ("url", url.clone())],
620 );
621 let response = self
622 .base
623 .execute(request)
624 .await
625 .map_err(|err| CliCoreError::message(format!("transport: PUT {path}: {err}")))?;
626 self.log_debug(
627 "http response",
628 [
629 ("status", response.status().as_u16().to_string()),
630 ("method", "PUT".to_owned()),
631 ("url", url),
632 ],
633 );
634 Ok(response)
635 }
636
637 async fn send_multipart<T: Default + DeserializeOwned>(
638 &self,
639 path: &str,
640 form: reqwest::multipart::Form,
641 ) -> Result<T> {
642 let response = self.send_multipart_response(path, form).await?;
643 decode_json_response(response, "POST", path).await
644 }
645
646 async fn send_multipart_without_response(
647 &self,
648 path: &str,
649 form: reqwest::multipart::Form,
650 ) -> Result<()> {
651 let response = self.send_multipart_response(path, form).await?;
652 ensure_success_response(response, "POST", path).await
653 }
654
655 async fn send_multipart_response(
656 &self,
657 path: &str,
658 form: reqwest::multipart::Form,
659 ) -> Result<reqwest::Response> {
660 let url = format!("{}{}", self.base_url, path);
661 let mut builder = self
662 .base
663 .post(url.clone())
664 .header(header::USER_AGENT, self.user_agent.clone())
665 .multipart(form);
666 for (key, value) in &self.default_headers {
667 builder = builder.header(key, value);
668 }
669 let mut request = builder
670 .build()
671 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
672 self.inject_auth(&mut request).await?;
673 self.log_debug("http multipart request", [("url", url)]);
674 self.base
675 .execute(request)
676 .await
677 .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
678 }
679
680 async fn do_empty<B: Serialize>(
681 &self,
682 method: Method,
683 path: &str,
684 body: Option<&B>,
685 ) -> Result<()> {
686 let method_text = method.as_str().to_owned();
687 let response = self.send_with_retry(method, path, body).await?;
688 ensure_success_response(response, &method_text, path).await
689 }
690
691 async fn send_raw_once(
692 &self,
693 method: Method,
694 path: &str,
695 content_type: &str,
696 body: Option<Vec<u8>>,
697 ) -> Result<reqwest::Response> {
698 let url = format!("{}{}", self.base_url, path);
699 let method_text = method.as_str().to_owned();
700 let mut builder = self
701 .base
702 .request(method, url)
703 .header(header::USER_AGENT, self.user_agent.clone());
704 if let Some(body) = body {
705 builder = builder.body(body);
706 }
707 if !content_type.is_empty() {
708 builder = builder.header(header::CONTENT_TYPE, content_type);
709 }
710 for (key, value) in &self.default_headers {
711 builder = builder.header(key, value);
712 }
713 let mut request = builder
714 .build()
715 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
716 self.inject_auth(&mut request).await?;
717 self.log_debug(
718 "http request",
719 [
720 ("method", method_text.clone()),
721 ("url", format!("{}{}", self.base_url, path)),
722 ],
723 );
724 self.base
725 .execute(request)
726 .await
727 .map_err(|err| CliCoreError::message(format!("transport: {method_text} {path}: {err}")))
728 }
729
730 async fn send_get_raw_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
731 let mut last_err = None;
732 for attempt in 0..MAX_RETRIES {
733 if attempt > 0 {
734 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
735 time::sleep(backoff).await;
736 }
737
738 match self.send_get_raw_once(path).await {
739 Ok(response) => {
740 if response.status() == StatusCode::TOO_MANY_REQUESTS
741 || response.status().is_server_error()
742 {
743 last_err = Some(CliCoreError::message(format!(
744 "transport: GET {}: status {}",
745 path,
746 response.status().as_u16()
747 )));
748 continue;
749 }
750 return Ok(response);
751 }
752 Err(err) => last_err = Some(err),
753 }
754 }
755 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
756 }
757
758 async fn send_get_raw_once(&self, path: &str) -> Result<reqwest::Response> {
759 let url = format!("{}{}", self.base_url, path);
760 let mut builder = self
761 .base
762 .get(url.clone())
763 .header(header::USER_AGENT, self.user_agent.clone());
764 for (key, value) in &self.default_headers {
765 builder = builder.header(key, value);
766 }
767 let mut request = builder
768 .build()
769 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
770 self.inject_auth(&mut request).await?;
771 self.log_debug("http raw request", [("url", url)]);
772 self.base
773 .execute(request)
774 .await
775 .map_err(|err| CliCoreError::message(format!("transport: GET {path}: {err}")))
776 }
777
778 async fn send_post_raw_once<B: Serialize>(
779 &self,
780 path: &str,
781 body: Option<&B>,
782 ) -> Result<reqwest::Response> {
783 let mut request = self
784 .build_request(Method::POST, path, body)?
785 .build()
786 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
787 self.inject_auth(&mut request).await?;
788 self.log_debug(
789 "http post raw request",
790 [("url", format!("{}{}", self.base_url, path))],
791 );
792 self.base
793 .execute(request)
794 .await
795 .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
796 }
797
798 async fn send_with_retry<B: Serialize>(
799 &self,
800 method: Method,
801 path: &str,
802 body: Option<&B>,
803 ) -> Result<reqwest::Response> {
804 let mut last_err = None;
805 for attempt in 0..MAX_RETRIES {
806 if attempt > 0 {
807 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
808 self.log_debug(
809 "retrying request",
810 [
811 ("attempt", (attempt + 1).to_string()),
812 ("backoff", format!("{backoff:?}")),
813 ],
814 );
815 time::sleep(backoff).await;
816 }
817
818 match self.send_once(method.clone(), path, body).await {
819 Ok(response) => {
820 if retryable_status(method.clone(), response.status()) {
821 last_err =
822 Some(retryable_status_error(response, method.as_str(), path).await);
823 continue;
824 }
825 return Ok(response);
826 }
827 Err(err) if is_idempotent(&method) => {
828 last_err = Some(err);
829 }
830 Err(err) => return Err(err),
831 }
832 }
833 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
834 }
835
836 async fn send_get_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
837 let mut last_err = None;
838 for attempt in 0..MAX_RETRIES {
839 if attempt > 0 {
840 let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
841 self.log_debug(
842 "retrying request",
843 [
844 ("attempt", (attempt + 1).to_string()),
845 ("backoff", format!("{backoff:?}")),
846 ],
847 );
848 time::sleep(backoff).await;
849 }
850
851 match self.send_once(Method::GET, path, Option::<&()>::None).await {
852 Ok(response) => {
853 if response.status() == StatusCode::TOO_MANY_REQUESTS
854 || response.status().is_server_error()
855 {
856 last_err = Some(CliCoreError::message(format!(
857 "transport: GET {}: status {}",
858 path,
859 response.status().as_u16()
860 )));
861 continue;
862 }
863 return Ok(response);
864 }
865 Err(err) => last_err = Some(err),
866 }
867 }
868 Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
869 }
870
871 async fn send_once<B: Serialize>(
872 &self,
873 method: Method,
874 path: &str,
875 body: Option<&B>,
876 ) -> Result<reqwest::Response> {
877 let mut request = self
878 .build_request(method.clone(), path, body)?
879 .build()
880 .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
881 self.inject_auth(&mut request).await?;
882 let method_text = method.as_str().to_owned();
883 self.log_debug(
884 "http request",
885 [
886 ("method", method_text.clone()),
887 ("url", format!("{}{}", self.base_url, path)),
888 ],
889 );
890 let response = self.base.execute(request).await.map_err(|err| {
891 CliCoreError::message(format!("transport: {method_text} {path}: {err}"))
892 })?;
893 self.log_debug(
894 "http response",
895 [
896 ("status", response.status().as_u16().to_string()),
897 ("method", method_text),
898 ("url", format!("{}{}", self.base_url, path)),
899 ],
900 );
901 Ok(response)
902 }
903
904 fn build_request<B: Serialize>(
905 &self,
906 method: Method,
907 path: &str,
908 body: Option<&B>,
909 ) -> Result<reqwest::RequestBuilder> {
910 let url = format!("{}{}", self.base_url, path);
911 let mut builder = self
912 .base
913 .request(method, url)
914 .header(header::USER_AGENT, self.user_agent.clone());
915 if let Some(body) = body {
916 let body = serde_json::to_vec(body)
917 .map_err(|err| CliCoreError::message(format!("transport: marshal body: {err}")))?;
918 builder = builder
919 .header(header::CONTENT_TYPE, "application/json")
920 .body(body);
921 }
922 for (key, value) in &self.default_headers {
923 builder = builder.header(key, value);
924 }
925 Ok(builder)
926 }
927
928 fn log_debug(
929 &self,
930 message: &'static str,
931 fields: impl IntoIterator<Item = (&'static str, String)>,
932 ) {
933 self.logger.debug(&TransportLogEvent {
934 message,
935 fields: fields
936 .into_iter()
937 .map(|(key, value)| (key.to_owned(), value))
938 .collect(),
939 });
940 }
941
942 async fn inject_auth(&self, request: &mut reqwest::Request) -> Result<()> {
943 self.auth
944 .inject(request)
945 .await
946 .map_err(|err| CliCoreError::message(format!("transport: auth inject: {err}")))
947 }
948}
949
950async fn decode_json_response<T: Default + DeserializeOwned>(
951 response: reqwest::Response,
952 method: &str,
953 path: &str,
954) -> Result<T> {
955 if response.status().is_client_error() || response.status().is_server_error() {
956 return Err(parse_error_response(response, method, path).await.into());
957 }
958 if response.status() == StatusCode::NO_CONTENT {
959 return Ok(T::default());
960 }
961 let body = response
962 .bytes()
963 .await
964 .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))?;
965 if body.trim_ascii() == b"null" {
966 return Ok(T::default());
967 }
968 serde_json::from_slice::<T>(&body)
969 .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))
970}
971
972async fn ensure_success_response(
973 response: reqwest::Response,
974 method: &str,
975 path: &str,
976) -> Result<()> {
977 if response.status().is_client_error() || response.status().is_server_error() {
978 return Err(parse_error_response(response, method, path).await.into());
979 }
980 Ok(())
981}
982
983pub async fn parse_error_response(response: reqwest::Response, method: &str, path: &str) -> Error {
990 let status = response.status();
991 let body = response.text().await.unwrap_or_default();
992 parse_error_body(status, &body, method, path)
993}
994
995fn parse_error_body(status: StatusCode, body: &str, method: &str, path: &str) -> Error {
996 if let Ok(mut api_error) = serde_json::from_str::<Error>(body)
997 && !api_error.message.is_empty()
998 {
999 api_error.code = format!("HTTP_{}", status.as_u16());
1000 return api_error;
1001 }
1002 Error {
1003 code: format!("HTTP_{}", status.as_u16()),
1004 message: format!("{} {}: {} {}", method, path, status.as_u16(), body),
1005 system: String::new(),
1006 request_id: String::new(),
1007 }
1008}
1009
1010fn retryable_status(method: Method, status: StatusCode) -> bool {
1011 status == StatusCode::TOO_MANY_REQUESTS || (status.is_server_error() && is_idempotent(&method))
1012}
1013
1014async fn retryable_status_error(
1015 response: reqwest::Response,
1016 method: &str,
1017 path: &str,
1018) -> CliCoreError {
1019 let status = response.status().as_u16();
1020 match response.text().await {
1021 Ok(body) => CliCoreError::message(format!(
1022 "transport: {method} {path}: status {status}: {body}"
1023 )),
1024 Err(err) => CliCoreError::message(format!(
1025 "transport: {method} {path}: status {status} (body read failed: {err})"
1026 )),
1027 }
1028}
1029
1030fn is_idempotent(method: &Method) -> bool {
1031 matches!(*method, Method::GET | Method::HEAD | Method::DELETE)
1032}