Skip to main content

cli_engine/transport/
client.rs

1use std::{
2    collections::BTreeMap,
3    io::Write,
4    path::Path,
5    sync::{Arc, OnceLock, RwLock},
6    time::Duration,
7};
8
9use reqwest::{Method, StatusCode, header};
10use serde::{Serialize, de::DeserializeOwned};
11use serde_json::Value;
12use tokio::time;
13
14use super::{AuthInjector, Error};
15use crate::{CliCoreError, Result};
16
17const MAX_RETRIES: usize = 3;
18const BASE_BACKOFF: Duration = Duration::from_millis(500);
19const BUILTIN_DEFAULT_USER_AGENT: &str = "cli/dev";
20static DEFAULT_USER_AGENT: OnceLock<RwLock<String>> = OnceLock::new();
21
22/// Sets the process-wide default user-agent for outbound requests.
23///
24/// Applies to subsequently created [`HttpClient`] values (those that do not set
25/// their own via [`HttpClientBuilder::user_agent`]) and to the engine's other
26/// outbound token traffic that reads this default — the PKCE provider's
27/// token/refresh requests and the client-credentials injector. A per-client
28/// user-agent still overrides it for that client.
29pub fn set_default_user_agent(user_agent: impl Into<String>) {
30    let lock =
31        DEFAULT_USER_AGENT.get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()));
32    if let Ok(mut current) = lock.write() {
33        *current = user_agent.into();
34    }
35}
36
37/// Returns the process-wide default user-agent set via
38/// [`set_default_user_agent`], or the builtin default when none was set.
39///
40/// Used by [`HttpClientBuilder`] and by the engine's OAuth token requests so
41/// that all outbound traffic carries the same user-agent.
42pub(crate) fn default_user_agent() -> String {
43    DEFAULT_USER_AGENT
44        .get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()))
45        .read()
46        .map_or_else(
47            |_| BUILTIN_DEFAULT_USER_AGENT.to_owned(),
48            |value| value.clone(),
49        )
50}
51
52/// Serializes unit tests that mutate the process-wide default user-agent so
53/// they cannot observe one another's writes. Integration tests in
54/// `tests/foundation.rs` run in a separate binary and use their own lock.
55#[cfg(test)]
56pub(crate) static UA_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
57
58/// Restores the process-wide default user-agent to the builtin on drop, so a
59/// panicking assertion in a test that mutates it cannot leak the value into
60/// later tests in this binary. Declare it after acquiring [`UA_TEST_LOCK`] so
61/// the reset runs while the lock is still held.
62#[cfg(test)]
63pub(crate) struct RestoreDefaultUserAgent;
64
65#[cfg(test)]
66impl Drop for RestoreDefaultUserAgent {
67    fn drop(&mut self) {
68        set_default_user_agent(BUILTIN_DEFAULT_USER_AGENT);
69    }
70}
71
72#[derive(serde::Deserialize)]
73struct GraphQlError {
74    message: String,
75}
76
77#[derive(Default, serde::Deserialize)]
78struct GraphQlEnvelope {
79    data: Option<Value>,
80    #[serde(default)]
81    errors: Vec<GraphQlError>,
82}
83
84/// Structured debug event emitted by [`TransportLogger`].
85#[derive(Clone, Debug)]
86pub struct TransportLogEvent {
87    /// Event name such as `http request` or `retrying request`.
88    pub message: &'static str,
89    /// Stable event fields.
90    pub fields: BTreeMap<String, String>,
91}
92
93/// Debug logger interface for transport events.
94pub trait TransportLogger: Send + Sync + std::fmt::Debug {
95    /// Records one debug event.
96    fn debug(&self, event: &TransportLogEvent);
97}
98
99/// Logger that intentionally drops transport events.
100#[derive(Clone, Debug, Default)]
101pub struct NoopTransportLogger;
102
103impl TransportLogger for NoopTransportLogger {
104    fn debug(&self, _event: &TransportLogEvent) {}
105}
106
107/// Authenticated HTTP client for CLI command implementations.
108///
109/// The client covers the transport behavior command authors usually need: auth
110/// injection, JSON request/response helpers, structured HTTP errors,
111/// idempotent retries, ETag helpers, raw streaming helpers, multipart helpers,
112/// and GraphQL envelope decoding.
113#[derive(Clone, Debug)]
114pub struct HttpClient {
115    base: reqwest::Client,
116    base_url: String,
117    auth: Arc<dyn AuthInjector>,
118    user_agent: String,
119    default_headers: BTreeMap<String, String>,
120    logger: Arc<dyn TransportLogger>,
121}
122
123/// Builder for [`HttpClient`].
124#[derive(Clone, Debug)]
125pub struct HttpClientBuilder {
126    base_url: String,
127    auth: Arc<dyn AuthInjector>,
128    user_agent: String,
129    default_headers: BTreeMap<String, String>,
130    logger: Arc<dyn TransportLogger>,
131}
132
133impl HttpClientBuilder {
134    /// Creates a builder with a base URL and auth injector.
135    #[must_use]
136    pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
137        Self {
138            base_url: base_url.into(),
139            auth,
140            user_agent: default_user_agent(),
141            default_headers: BTreeMap::new(),
142            logger: Arc::new(NoopTransportLogger),
143        }
144    }
145
146    /// Sets the user-agent for this client.
147    #[must_use]
148    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
149        self.user_agent = user_agent.into();
150        self
151    }
152
153    /// Alias for [`HttpClientBuilder::user_agent`] for migration readability.
154    #[must_use]
155    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Self {
156        self.user_agent(user_agent)
157    }
158
159    /// Sets headers sent on every request.
160    #[must_use]
161    pub fn default_headers(mut self, headers: BTreeMap<String, String>) -> Self {
162        self.default_headers = headers;
163        self
164    }
165
166    /// Alias for [`HttpClientBuilder::default_headers`] for migration readability.
167    #[must_use]
168    pub fn with_default_headers(self, headers: BTreeMap<String, String>) -> Self {
169        self.default_headers(headers)
170    }
171
172    /// Sets the transport debug logger.
173    #[must_use]
174    pub fn logger(mut self, logger: Arc<dyn TransportLogger>) -> Self {
175        self.logger = logger;
176        self
177    }
178
179    /// Alias for [`HttpClientBuilder::logger`] for migration readability.
180    #[must_use]
181    pub fn with_logger(self, logger: Arc<dyn TransportLogger>) -> Self {
182        self.logger(logger)
183    }
184
185    /// Builds the client.
186    #[must_use]
187    pub fn build(self) -> HttpClient {
188        HttpClient {
189            base: reqwest::Client::new(),
190            base_url: self.base_url,
191            auth: self.auth,
192            user_agent: self.user_agent,
193            default_headers: self.default_headers,
194            logger: self.logger,
195        }
196    }
197}
198
199impl HttpClient {
200    /// Creates a client builder.
201    #[must_use]
202    pub fn builder(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> HttpClientBuilder {
203        HttpClientBuilder::new(base_url, auth)
204    }
205
206    /// Creates a client with default settings.
207    #[must_use]
208    pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
209        HttpClientBuilder::new(base_url, auth).build()
210    }
211
212    /// Sends GET and decodes a JSON response.
213    pub async fn get<T: Default + DeserializeOwned>(&self, path: &str) -> Result<T> {
214        self.do_json(Method::GET, path, Option::<&()>::None).await
215    }
216
217    /// Sends GET and checks only for success.
218    pub async fn get_without_response(&self, path: &str) -> Result<()> {
219        self.do_empty(Method::GET, path, Option::<&()>::None).await
220    }
221
222    /// Sends POST with a JSON body and decodes a JSON response.
223    pub async fn post<B: Serialize, T: Default + DeserializeOwned>(
224        &self,
225        path: &str,
226        body: &B,
227    ) -> Result<T> {
228        self.do_json(Method::POST, path, Some(body)).await
229    }
230
231    /// Sends POST with a JSON body and checks only for success.
232    pub async fn post_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
233        self.do_empty(Method::POST, path, Some(body)).await
234    }
235
236    /// Sends PUT with a JSON body and decodes a JSON response.
237    pub async fn put<B: Serialize, T: Default + DeserializeOwned>(
238        &self,
239        path: &str,
240        body: &B,
241    ) -> Result<T> {
242        self.do_json(Method::PUT, path, Some(body)).await
243    }
244
245    /// Sends PUT with a JSON body and checks only for success.
246    pub async fn put_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
247        self.do_empty(Method::PUT, path, Some(body)).await
248    }
249
250    /// Sends PATCH with a JSON body and decodes a JSON response.
251    pub async fn patch<B: Serialize, T: Default + DeserializeOwned>(
252        &self,
253        path: &str,
254        body: &B,
255    ) -> Result<T> {
256        self.do_json(Method::PATCH, path, Some(body)).await
257    }
258
259    /// Sends PATCH with a JSON body and checks only for success.
260    pub async fn patch_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
261        self.do_empty(Method::PATCH, path, Some(body)).await
262    }
263
264    /// Sends DELETE and checks for success.
265    pub async fn delete(&self, path: &str) -> Result<()> {
266        self.do_empty(Method::DELETE, path, Option::<&()>::None)
267            .await
268    }
269
270    /// Sends DELETE with a JSON body and checks for success.
271    pub async fn delete_with_body<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
272        self.do_empty(Method::DELETE, path, Some(body)).await
273    }
274
275    /// Sends GET and returns decoded JSON plus the ETag header.
276    pub async fn get_etag<T: Default + DeserializeOwned>(&self, path: &str) -> Result<(T, String)> {
277        let response = self.send_get_status_only_retry(path).await?;
278        let etag = response
279            .headers()
280            .get(header::ETAG)
281            .and_then(|value| value.to_str().ok())
282            .unwrap_or_default()
283            .to_owned();
284        let value = decode_json_response(response, "GET", path).await?;
285        Ok((value, etag))
286    }
287
288    /// Sends GET and returns only the ETag header after checking success.
289    pub async fn get_etag_without_response(&self, path: &str) -> Result<String> {
290        let response = self.send_get_status_only_retry(path).await?;
291        let etag = response
292            .headers()
293            .get(header::ETAG)
294            .and_then(|value| value.to_str().ok())
295            .unwrap_or_default()
296            .to_owned();
297        ensure_success_response(response, "GET", path).await?;
298        Ok(etag)
299    }
300
301    /// Sends PUT with `If-Match` and decodes a JSON response.
302    pub async fn put_if_match<B: Serialize, T: Default + DeserializeOwned>(
303        &self,
304        path: &str,
305        body: &B,
306        etag: &str,
307    ) -> Result<T> {
308        let response = self.send_put_if_match(path, body, etag).await?;
309        decode_json_response(response, "PUT", path).await
310    }
311
312    /// Sends PUT with `If-Match` and checks only for success.
313    pub async fn put_if_match_without_response<B: Serialize>(
314        &self,
315        path: &str,
316        body: &B,
317        etag: &str,
318    ) -> Result<()> {
319        let response = self.send_put_if_match(path, body, etag).await?;
320        ensure_success_response(response, "PUT", path).await
321    }
322
323    /// Streams a raw GET response body into a writer.
324    pub async fn get_raw(&self, path: &str, writer: &mut dyn Write) -> Result<()> {
325        let response = self.send_get_raw_status_only_retry(path).await?;
326        if response.status().is_client_error() || response.status().is_server_error() {
327            return Err(parse_error_response(response, "GET", path).await.into());
328        }
329        let bytes = response
330            .bytes()
331            .await
332            .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
333        writer.write_all(&bytes)?;
334        Ok(())
335    }
336
337    /// Sends GET and returns the raw response body as bytes.
338    pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
339        let response = self.send_get_raw_status_only_retry(path).await?;
340        if response.status().is_client_error() || response.status().is_server_error() {
341            return Err(parse_error_response(response, "GET", path).await.into());
342        }
343        let bytes = response
344            .bytes()
345            .await
346            .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
347        Ok(bytes.to_vec())
348    }
349
350    /// Sends POST and streams the raw response body into a writer.
351    pub async fn post_raw<B: Serialize>(
352        &self,
353        path: &str,
354        body: Option<&B>,
355        writer: &mut dyn Write,
356    ) -> Result<()> {
357        let response = self.send_post_raw_once(path, body).await?;
358        if response.status().is_client_error() || response.status().is_server_error() {
359            return Err(parse_error_response(response, "POST", path).await.into());
360        }
361        let bytes = response
362            .bytes()
363            .await
364            .map_err(|err| CliCoreError::message(format!("transport: stream response: {err}")))?;
365        writer.write_all(&bytes)?;
366        Ok(())
367    }
368
369    /// Sends a raw-body request and decodes a JSON response.
370    pub async fn do_raw<T: Default + DeserializeOwned>(
371        &self,
372        method: Method,
373        path: &str,
374        content_type: &str,
375        body: impl Into<Vec<u8>>,
376    ) -> Result<T> {
377        self.do_raw_optional_body(method, path, content_type, Some(body.into()))
378            .await
379    }
380
381    /// Sends an optional raw-body request and decodes a JSON response.
382    pub async fn do_raw_optional_body<T: Default + DeserializeOwned>(
383        &self,
384        method: Method,
385        path: &str,
386        content_type: &str,
387        body: Option<Vec<u8>>,
388    ) -> Result<T> {
389        let method_text = method.as_str().to_owned();
390        let response = self.send_raw_once(method, path, content_type, body).await?;
391        decode_json_response(response, &method_text, path).await
392    }
393
394    /// Sends a raw-body request and checks only for success.
395    pub async fn do_raw_without_response(
396        &self,
397        method: Method,
398        path: &str,
399        content_type: &str,
400        body: impl Into<Vec<u8>>,
401    ) -> Result<()> {
402        self.do_raw_optional_body_without_response(method, path, content_type, Some(body.into()))
403            .await
404    }
405
406    /// Sends an optional raw-body request and checks only for success.
407    pub async fn do_raw_optional_body_without_response(
408        &self,
409        method: Method,
410        path: &str,
411        content_type: &str,
412        body: Option<Vec<u8>>,
413    ) -> Result<()> {
414        let method_text = method.as_str().to_owned();
415        let response = self.send_raw_once(method, path, content_type, body).await?;
416        ensure_success_response(response, &method_text, path).await
417    }
418
419    /// Sends a multipart file upload and decodes a JSON response.
420    pub async fn post_multipart<T: Default + DeserializeOwned>(
421        &self,
422        path: &str,
423        field_name: &str,
424        file_path: &Path,
425    ) -> Result<T> {
426        self.post_multipart_with_fields(path, field_name, file_path, &BTreeMap::new())
427            .await
428    }
429
430    /// Sends a multipart file upload and checks only for success.
431    pub async fn post_multipart_without_response(
432        &self,
433        path: &str,
434        field_name: &str,
435        file_path: &Path,
436    ) -> Result<()> {
437        self.post_multipart_with_fields_without_response(
438            path,
439            field_name,
440            file_path,
441            &BTreeMap::new(),
442        )
443        .await
444    }
445
446    /// Sends a multipart file upload with fields and decodes a JSON response.
447    pub async fn post_multipart_with_fields<T: Default + DeserializeOwned>(
448        &self,
449        path: &str,
450        file_field: &str,
451        file_path: &Path,
452        fields: &BTreeMap<String, String>,
453    ) -> Result<T> {
454        let form = self.multipart_form(file_field, file_path, fields).await?;
455        self.send_multipart(path, form).await
456    }
457
458    async fn multipart_form(
459        &self,
460        file_field: &str,
461        file_path: &Path,
462        fields: &BTreeMap<String, String>,
463    ) -> Result<reqwest::multipart::Form> {
464        let mut form = reqwest::multipart::Form::new();
465        for (key, value) in fields {
466            form = form.text(key.clone(), value.clone());
467        }
468        let file_name = file_path
469            .file_name()
470            .and_then(|name| name.to_str())
471            .unwrap_or("file")
472            .to_owned();
473        let bytes = tokio::fs::read(file_path)
474            .await
475            .map_err(|err| CliCoreError::message(format!("transport: open file: {err}")))?;
476        let part = reqwest::multipart::Part::bytes(bytes).file_name(file_name);
477        form = form.part(file_field.to_owned(), part);
478        Ok(form)
479    }
480
481    /// Sends a multipart file upload with fields and checks only for success.
482    pub async fn post_multipart_with_fields_without_response(
483        &self,
484        path: &str,
485        file_field: &str,
486        file_path: &Path,
487        fields: &BTreeMap<String, String>,
488    ) -> Result<()> {
489        let form = self.multipart_form(file_field, file_path, fields).await?;
490        self.send_multipart_without_response(path, form).await
491    }
492
493    /// Sends multipart form fields without a file and decodes a JSON response.
494    pub async fn post_multipart_fields<T: Default + DeserializeOwned>(
495        &self,
496        path: &str,
497        fields: &BTreeMap<String, String>,
498    ) -> Result<T> {
499        let mut form = reqwest::multipart::Form::new();
500        for (key, value) in fields {
501            form = form.text(key.clone(), value.clone());
502        }
503        self.send_multipart(path, form).await
504    }
505
506    /// Sends multipart form fields without a file and checks only for success.
507    pub async fn post_multipart_fields_without_response(
508        &self,
509        path: &str,
510        fields: &BTreeMap<String, String>,
511    ) -> Result<()> {
512        let mut form = reqwest::multipart::Form::new();
513        for (key, value) in fields {
514            form = form.text(key.clone(), value.clone());
515        }
516        self.send_multipart_without_response(path, form).await
517    }
518
519    /// Sends a GraphQL request and decodes the `data` envelope into a value.
520    pub async fn post_graphql<T: DeserializeOwned + Default>(
521        &self,
522        path: &str,
523        query: &str,
524        variables: BTreeMap<String, Value>,
525    ) -> Result<T> {
526        self.post_graphql_optional_variables(path, query, Some(variables))
527            .await
528    }
529
530    /// Sends a GraphQL request with optional variables and decodes `data`.
531    pub async fn post_graphql_optional_variables<T: DeserializeOwned + Default>(
532        &self,
533        path: &str,
534        query: &str,
535        variables: Option<BTreeMap<String, Value>>,
536    ) -> Result<T> {
537        let mut result = T::default();
538        self.post_graphql_optional_variables_into(path, query, variables, &mut result)
539            .await?;
540        Ok(result)
541    }
542
543    /// Sends a GraphQL request and checks only for GraphQL/HTTP success.
544    pub async fn post_graphql_without_response(
545        &self,
546        path: &str,
547        query: &str,
548        variables: BTreeMap<String, Value>,
549    ) -> Result<()> {
550        self.post_graphql_optional_variables_without_response(path, query, Some(variables))
551            .await
552    }
553
554    /// Sends a GraphQL request with optional variables and checks only for success.
555    pub async fn post_graphql_optional_variables_without_response(
556        &self,
557        path: &str,
558        query: &str,
559        variables: Option<BTreeMap<String, Value>>,
560    ) -> Result<()> {
561        self.post_graphql_response_envelope(path, query, variables)
562            .await?;
563        Ok(())
564    }
565
566    /// Sends a GraphQL request and decodes `data` into an existing value.
567    pub async fn post_graphql_into<T: DeserializeOwned>(
568        &self,
569        path: &str,
570        query: &str,
571        variables: BTreeMap<String, Value>,
572        result: &mut T,
573    ) -> Result<()> {
574        self.post_graphql_optional_variables_into(path, query, Some(variables), result)
575            .await
576    }
577
578    /// Sends a GraphQL request with optional variables and decodes into an existing value.
579    pub async fn post_graphql_optional_variables_into<T: DeserializeOwned>(
580        &self,
581        path: &str,
582        query: &str,
583        variables: Option<BTreeMap<String, Value>>,
584        result: &mut T,
585    ) -> Result<()> {
586        let envelope = self
587            .post_graphql_response_envelope(path, query, variables)
588            .await?;
589        if let Some(data) = envelope.data
590            && !data.is_null()
591        {
592            *result = serde_json::from_value(data).map_err(|err| {
593                CliCoreError::message(format!("transport: decode graphql data: {err}"))
594            })?;
595        }
596        Ok(())
597    }
598
599    async fn do_json<B: Serialize, T: Default + DeserializeOwned>(
600        &self,
601        method: Method,
602        path: &str,
603        body: Option<&B>,
604    ) -> Result<T> {
605        let method_text = method.as_str().to_owned();
606        let response = self.send_with_retry(method, path, body).await?;
607        decode_json_response(response, &method_text, path).await
608    }
609
610    async fn post_graphql_response_envelope(
611        &self,
612        path: &str,
613        query: &str,
614        variables: Option<BTreeMap<String, Value>>,
615    ) -> Result<GraphQlEnvelope> {
616        #[derive(Serialize)]
617        struct Request<'query> {
618            query: &'query str,
619            variables: Option<BTreeMap<String, Value>>,
620        }
621
622        let envelope: GraphQlEnvelope = self.post(path, &Request { query, variables }).await?;
623        if !envelope.errors.is_empty() {
624            let message = envelope
625                .errors
626                .iter()
627                .map(|error| error.message.as_str())
628                .collect::<Vec<_>>()
629                .join("; ");
630            return Err(CliCoreError::message(format!("graphql: {message}")));
631        }
632        Ok(envelope)
633    }
634
635    async fn send_put_if_match<B: Serialize>(
636        &self,
637        path: &str,
638        body: &B,
639        etag: &str,
640    ) -> Result<reqwest::Response> {
641        let mut request = self
642            .build_request(Method::PUT, path, Some(body))?
643            .header(header::IF_MATCH, etag)
644            .build()
645            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
646        self.inject_auth(&mut request).await?;
647        let url = format!("{}{}", self.base_url, path);
648        self.log_debug(
649            "http request",
650            [("method", "PUT".to_owned()), ("url", url.clone())],
651        );
652        let response = self
653            .base
654            .execute(request)
655            .await
656            .map_err(|err| CliCoreError::message(format!("transport: PUT {path}: {err}")))?;
657        self.log_debug(
658            "http response",
659            [
660                ("status", response.status().as_u16().to_string()),
661                ("method", "PUT".to_owned()),
662                ("url", url),
663            ],
664        );
665        Ok(response)
666    }
667
668    async fn send_multipart<T: Default + DeserializeOwned>(
669        &self,
670        path: &str,
671        form: reqwest::multipart::Form,
672    ) -> Result<T> {
673        let response = self.send_multipart_response(path, form).await?;
674        decode_json_response(response, "POST", path).await
675    }
676
677    async fn send_multipart_without_response(
678        &self,
679        path: &str,
680        form: reqwest::multipart::Form,
681    ) -> Result<()> {
682        let response = self.send_multipart_response(path, form).await?;
683        ensure_success_response(response, "POST", path).await
684    }
685
686    async fn send_multipart_response(
687        &self,
688        path: &str,
689        form: reqwest::multipart::Form,
690    ) -> Result<reqwest::Response> {
691        let url = format!("{}{}", self.base_url, path);
692        let mut builder = self
693            .base
694            .post(url.clone())
695            .header(header::USER_AGENT, self.user_agent.clone())
696            .multipart(form);
697        for (key, value) in &self.default_headers {
698            builder = builder.header(key, value);
699        }
700        let mut request = builder
701            .build()
702            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
703        self.inject_auth(&mut request).await?;
704        self.log_debug("http multipart request", [("url", url)]);
705        self.base
706            .execute(request)
707            .await
708            .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
709    }
710
711    async fn do_empty<B: Serialize>(
712        &self,
713        method: Method,
714        path: &str,
715        body: Option<&B>,
716    ) -> Result<()> {
717        let method_text = method.as_str().to_owned();
718        let response = self.send_with_retry(method, path, body).await?;
719        ensure_success_response(response, &method_text, path).await
720    }
721
722    async fn send_raw_once(
723        &self,
724        method: Method,
725        path: &str,
726        content_type: &str,
727        body: Option<Vec<u8>>,
728    ) -> Result<reqwest::Response> {
729        let url = format!("{}{}", self.base_url, path);
730        let method_text = method.as_str().to_owned();
731        let mut builder = self
732            .base
733            .request(method, url)
734            .header(header::USER_AGENT, self.user_agent.clone());
735        if let Some(body) = body {
736            builder = builder.body(body);
737        }
738        if !content_type.is_empty() {
739            builder = builder.header(header::CONTENT_TYPE, content_type);
740        }
741        for (key, value) in &self.default_headers {
742            builder = builder.header(key, value);
743        }
744        let mut request = builder
745            .build()
746            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
747        self.inject_auth(&mut request).await?;
748        self.log_debug(
749            "http request",
750            [
751                ("method", method_text.clone()),
752                ("url", format!("{}{}", self.base_url, path)),
753            ],
754        );
755        self.base
756            .execute(request)
757            .await
758            .map_err(|err| CliCoreError::message(format!("transport: {method_text} {path}: {err}")))
759    }
760
761    async fn send_get_raw_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
762        let mut last_err = None;
763        for attempt in 0..MAX_RETRIES {
764            if attempt > 0 {
765                let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
766                time::sleep(backoff).await;
767            }
768
769            match self.send_get_raw_once(path).await {
770                Ok(response) => {
771                    if response.status() == StatusCode::TOO_MANY_REQUESTS
772                        || response.status().is_server_error()
773                    {
774                        last_err = Some(CliCoreError::message(format!(
775                            "transport: GET {}: status {}",
776                            path,
777                            response.status().as_u16()
778                        )));
779                        continue;
780                    }
781                    return Ok(response);
782                }
783                Err(err) => last_err = Some(err),
784            }
785        }
786        Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
787    }
788
789    async fn send_get_raw_once(&self, path: &str) -> Result<reqwest::Response> {
790        let url = format!("{}{}", self.base_url, path);
791        let mut builder = self
792            .base
793            .get(url.clone())
794            .header(header::USER_AGENT, self.user_agent.clone());
795        for (key, value) in &self.default_headers {
796            builder = builder.header(key, value);
797        }
798        let mut request = builder
799            .build()
800            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
801        self.inject_auth(&mut request).await?;
802        self.log_debug("http raw request", [("url", url)]);
803        self.base
804            .execute(request)
805            .await
806            .map_err(|err| CliCoreError::message(format!("transport: GET {path}: {err}")))
807    }
808
809    async fn send_post_raw_once<B: Serialize>(
810        &self,
811        path: &str,
812        body: Option<&B>,
813    ) -> Result<reqwest::Response> {
814        let mut request = self
815            .build_request(Method::POST, path, body)?
816            .build()
817            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
818        self.inject_auth(&mut request).await?;
819        self.log_debug(
820            "http post raw request",
821            [("url", format!("{}{}", self.base_url, path))],
822        );
823        self.base
824            .execute(request)
825            .await
826            .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
827    }
828
829    async fn send_with_retry<B: Serialize>(
830        &self,
831        method: Method,
832        path: &str,
833        body: Option<&B>,
834    ) -> Result<reqwest::Response> {
835        let mut last_err = None;
836        for attempt in 0..MAX_RETRIES {
837            if attempt > 0 {
838                let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
839                self.log_debug(
840                    "retrying request",
841                    [
842                        ("attempt", (attempt + 1).to_string()),
843                        ("backoff", format!("{backoff:?}")),
844                    ],
845                );
846                time::sleep(backoff).await;
847            }
848
849            match self.send_once(method.clone(), path, body).await {
850                Ok(response) => {
851                    if retryable_status(method.clone(), response.status()) {
852                        last_err =
853                            Some(retryable_status_error(response, method.as_str(), path).await);
854                        continue;
855                    }
856                    return Ok(response);
857                }
858                Err(err) if is_idempotent(&method) => {
859                    last_err = Some(err);
860                }
861                Err(err) => return Err(err),
862            }
863        }
864        Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
865    }
866
867    async fn send_get_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
868        let mut last_err = None;
869        for attempt in 0..MAX_RETRIES {
870            if attempt > 0 {
871                let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
872                self.log_debug(
873                    "retrying request",
874                    [
875                        ("attempt", (attempt + 1).to_string()),
876                        ("backoff", format!("{backoff:?}")),
877                    ],
878                );
879                time::sleep(backoff).await;
880            }
881
882            match self.send_once(Method::GET, path, Option::<&()>::None).await {
883                Ok(response) => {
884                    if response.status() == StatusCode::TOO_MANY_REQUESTS
885                        || response.status().is_server_error()
886                    {
887                        last_err = Some(CliCoreError::message(format!(
888                            "transport: GET {}: status {}",
889                            path,
890                            response.status().as_u16()
891                        )));
892                        continue;
893                    }
894                    return Ok(response);
895                }
896                Err(err) => last_err = Some(err),
897            }
898        }
899        Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
900    }
901
902    async fn send_once<B: Serialize>(
903        &self,
904        method: Method,
905        path: &str,
906        body: Option<&B>,
907    ) -> Result<reqwest::Response> {
908        let mut request = self
909            .build_request(method.clone(), path, body)?
910            .build()
911            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
912        self.inject_auth(&mut request).await?;
913        let method_text = method.as_str().to_owned();
914        self.log_debug(
915            "http request",
916            [
917                ("method", method_text.clone()),
918                ("url", format!("{}{}", self.base_url, path)),
919            ],
920        );
921        let response = self.base.execute(request).await.map_err(|err| {
922            CliCoreError::message(format!("transport: {method_text} {path}: {err}"))
923        })?;
924        self.log_debug(
925            "http response",
926            [
927                ("status", response.status().as_u16().to_string()),
928                ("method", method_text),
929                ("url", format!("{}{}", self.base_url, path)),
930            ],
931        );
932        Ok(response)
933    }
934
935    fn build_request<B: Serialize>(
936        &self,
937        method: Method,
938        path: &str,
939        body: Option<&B>,
940    ) -> Result<reqwest::RequestBuilder> {
941        let url = format!("{}{}", self.base_url, path);
942        let mut builder = self
943            .base
944            .request(method, url)
945            .header(header::USER_AGENT, self.user_agent.clone());
946        if let Some(body) = body {
947            let body = serde_json::to_vec(body)
948                .map_err(|err| CliCoreError::message(format!("transport: marshal body: {err}")))?;
949            builder = builder
950                .header(header::CONTENT_TYPE, "application/json")
951                .body(body);
952        }
953        for (key, value) in &self.default_headers {
954            builder = builder.header(key, value);
955        }
956        Ok(builder)
957    }
958
959    fn log_debug(
960        &self,
961        message: &'static str,
962        fields: impl IntoIterator<Item = (&'static str, String)>,
963    ) {
964        self.logger.debug(&TransportLogEvent {
965            message,
966            fields: fields
967                .into_iter()
968                .map(|(key, value)| (key.to_owned(), value))
969                .collect(),
970        });
971    }
972
973    async fn inject_auth(&self, request: &mut reqwest::Request) -> Result<()> {
974        self.auth
975            .inject(request)
976            .await
977            .map_err(|err| CliCoreError::message(format!("transport: auth inject: {err}")))
978    }
979}
980
981async fn decode_json_response<T: Default + DeserializeOwned>(
982    response: reqwest::Response,
983    method: &str,
984    path: &str,
985) -> Result<T> {
986    if response.status().is_client_error() || response.status().is_server_error() {
987        return Err(parse_error_response(response, method, path).await.into());
988    }
989    if response.status() == StatusCode::NO_CONTENT {
990        return Ok(T::default());
991    }
992    let body = response
993        .bytes()
994        .await
995        .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))?;
996    if body.trim_ascii() == b"null" {
997        return Ok(T::default());
998    }
999    serde_json::from_slice::<T>(&body)
1000        .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))
1001}
1002
1003async fn ensure_success_response(
1004    response: reqwest::Response,
1005    method: &str,
1006    path: &str,
1007) -> Result<()> {
1008    if response.status().is_client_error() || response.status().is_server_error() {
1009        return Err(parse_error_response(response, method, path).await.into());
1010    }
1011    Ok(())
1012}
1013
1014/// Converts a non-success HTTP response into the shared transport error shape.
1015///
1016/// If the response body already contains an API-style error document, the
1017/// service message is preserved and the HTTP status is normalized into the
1018/// error code. Otherwise the method, path, status, and response body are folded
1019/// into a readable fallback message.
1020pub async fn parse_error_response(response: reqwest::Response, method: &str, path: &str) -> Error {
1021    let status = response.status();
1022    let body = response.text().await.unwrap_or_default();
1023    parse_error_body(status, &body, method, path)
1024}
1025
1026fn parse_error_body(status: StatusCode, body: &str, method: &str, path: &str) -> Error {
1027    if let Ok(mut api_error) = serde_json::from_str::<Error>(body)
1028        && !api_error.message.is_empty()
1029    {
1030        api_error.code = format!("HTTP_{}", status.as_u16());
1031        return api_error;
1032    }
1033    Error {
1034        code: format!("HTTP_{}", status.as_u16()),
1035        message: format!("{} {}: {} {}", method, path, status.as_u16(), body),
1036        system: String::new(),
1037        request_id: String::new(),
1038    }
1039}
1040
1041fn retryable_status(method: Method, status: StatusCode) -> bool {
1042    status == StatusCode::TOO_MANY_REQUESTS || (status.is_server_error() && is_idempotent(&method))
1043}
1044
1045async fn retryable_status_error(
1046    response: reqwest::Response,
1047    method: &str,
1048    path: &str,
1049) -> CliCoreError {
1050    let status = response.status().as_u16();
1051    match response.text().await {
1052        Ok(body) => CliCoreError::message(format!(
1053            "transport: {method} {path}: status {status}: {body}"
1054        )),
1055        Err(err) => CliCoreError::message(format!(
1056            "transport: {method} {path}: status {status} (body read failed: {err})"
1057        )),
1058    }
1059}
1060
1061fn is_idempotent(method: &Method) -> bool {
1062    matches!(*method, Method::GET | Method::HEAD | Method::DELETE)
1063}