Skip to main content

cli_engine/transport/
client.rs

1use std::{
2    collections::BTreeMap,
3    io::Write,
4    path::Path,
5    sync::{Arc, OnceLock, RwLock},
6    time::Duration,
7};
8
9use bytes::Bytes;
10use reqwest::{Method, StatusCode, header};
11use serde::{Serialize, de::DeserializeOwned};
12use serde_json::Value;
13use tokio::time;
14
15use super::{AuthInjector, Error};
16use crate::{CliCoreError, Result};
17
18const MAX_RETRIES: usize = 3;
19const BASE_BACKOFF: Duration = Duration::from_millis(500);
20const BUILTIN_DEFAULT_USER_AGENT: &str = "cli/dev";
21static DEFAULT_USER_AGENT: OnceLock<RwLock<String>> = OnceLock::new();
22
23/// Sets the process-wide default user-agent for outbound requests.
24///
25/// Applies to subsequently created [`HttpClient`] values (those that do not set
26/// their own via [`HttpClientBuilder::user_agent`]) and to the engine's other
27/// outbound token traffic that reads this default — the PKCE provider's
28/// token/refresh requests and the client-credentials injector. A per-client
29/// user-agent still overrides it for that client.
30pub fn set_default_user_agent(user_agent: impl Into<String>) {
31    let lock =
32        DEFAULT_USER_AGENT.get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()));
33    if let Ok(mut current) = lock.write() {
34        *current = user_agent.into();
35    }
36}
37
38/// Returns the process-wide default user-agent set via
39/// [`set_default_user_agent`], or the builtin default when none was set.
40///
41/// Used by [`HttpClientBuilder`] and by the engine's OAuth token requests so
42/// that all outbound traffic carries the same user-agent.
43pub(crate) fn default_user_agent() -> String {
44    DEFAULT_USER_AGENT
45        .get_or_init(|| RwLock::new(BUILTIN_DEFAULT_USER_AGENT.to_owned()))
46        .read()
47        .map_or_else(
48            |_| BUILTIN_DEFAULT_USER_AGENT.to_owned(),
49            |value| value.clone(),
50        )
51}
52
53/// Serializes unit tests that mutate the process-wide default user-agent so
54/// they cannot observe one another's writes. Integration tests in
55/// `tests/foundation.rs` run in a separate binary and use their own lock.
56#[cfg(test)]
57pub(crate) static UA_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
58
59/// Restores the process-wide default user-agent to the builtin on drop, so a
60/// panicking assertion in a test that mutates it cannot leak the value into
61/// later tests in this binary. Declare it after acquiring [`UA_TEST_LOCK`] so
62/// the reset runs while the lock is still held.
63#[cfg(test)]
64pub(crate) struct RestoreDefaultUserAgent;
65
66#[cfg(test)]
67impl Drop for RestoreDefaultUserAgent {
68    fn drop(&mut self) {
69        set_default_user_agent(BUILTIN_DEFAULT_USER_AGENT);
70    }
71}
72
73static DEFAULT_TRANSPORT_LOGGER: OnceLock<RwLock<Arc<dyn TransportLogger>>> = OnceLock::new();
74
75fn default_transport_logger_lock() -> &'static RwLock<Arc<dyn TransportLogger>> {
76    DEFAULT_TRANSPORT_LOGGER.get_or_init(|| RwLock::new(Arc::new(NoopTransportLogger)))
77}
78
79/// Sets the process-wide default transport logger for outbound HTTP traffic.
80///
81/// Applies to subsequently created [`HttpClient`] values (those that do not set
82/// their own via [`HttpClientBuilder::logger`]) and to the free
83/// [`super::debug_log_reqwest_request`] / [`super::debug_log_reqwest_response`]
84/// helpers used by code that talks to `reqwest` directly.
85///
86/// The CLI installs a logger from this setter when `--debug` selects the
87/// `transport` component, so command handlers get request/response diagnostics
88/// without any per-command wiring. A per-client logger still overrides it for
89/// that client.
90pub fn set_default_transport_logger(logger: Arc<dyn TransportLogger>) {
91    // Recover from a poisoned lock (a panic while a writer held it) instead of
92    // silently doing nothing, which would leave a stale logger installed and
93    // make `--debug transport` appear ineffective.
94    let mut current = default_transport_logger_lock()
95        .write()
96        .unwrap_or_else(std::sync::PoisonError::into_inner);
97    *current = logger;
98}
99
100/// Returns the process-wide default transport logger set via
101/// [`set_default_transport_logger`], or a [`NoopTransportLogger`] when none was
102/// set.
103#[must_use]
104pub fn default_transport_logger() -> Arc<dyn TransportLogger> {
105    default_transport_logger_lock()
106        .read()
107        .unwrap_or_else(std::sync::PoisonError::into_inner)
108        .clone()
109}
110
111/// Logs a `reqwest::Request` to the process-wide default transport logger.
112///
113/// This is the bridge for code that talks to `reqwest` directly — bare clients
114/// or progenitor-generated clients that cannot use [`HttpClient`] — so a single
115/// `--debug`-controlled trace can still cover them. Captures the request method,
116/// URL, headers, and in-memory body. Pairs with [`debug_log_reqwest_response`].
117/// It is a no-op (no header clone or body copy) unless an enabled logger has
118/// been installed via [`set_default_transport_logger`].
119pub fn debug_log_reqwest_request(request: &reqwest::Request) {
120    let logger = default_transport_logger();
121    if !logger.enabled() {
122        return;
123    }
124    logger.debug(&TransportLogEvent {
125        message: "http request",
126        fields: BTreeMap::from([
127            ("method".to_owned(), request.method().as_str().to_owned()),
128            ("url".to_owned(), request.url().as_str().to_owned()),
129        ]),
130        headers: Some(header_pairs(request.headers())),
131        body: request
132            .body()
133            .and_then(reqwest::Body::as_bytes)
134            .map(<[u8]>::to_vec),
135    });
136}
137
138/// Logs an HTTP response (status, headers, body) to the process-wide default
139/// transport logger.
140///
141/// Companion to [`debug_log_reqwest_request`] for `reqwest`-direct call sites.
142/// The caller passes the already-read response body. It is a no-op (no header
143/// clone or body copy) unless an enabled logger has been installed via
144/// [`set_default_transport_logger`].
145pub fn debug_log_reqwest_response(status: StatusCode, headers: &header::HeaderMap, body: &[u8]) {
146    let logger = default_transport_logger();
147    if !logger.enabled() {
148        return;
149    }
150    logger.debug(&TransportLogEvent {
151        message: "http response",
152        fields: BTreeMap::from([("status".to_owned(), status.as_u16().to_string())]),
153        headers: Some(header_pairs(headers)),
154        body: Some(body.to_vec()),
155    });
156}
157
158/// Serializes unit tests that mutate the process-wide default transport logger
159/// so they cannot observe one another's writes. Integration tests in
160/// `tests/foundation.rs` run in a separate binary and use their own lock.
161#[cfg(test)]
162pub(crate) static TRANSPORT_LOGGER_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
163
164/// Restores the process-wide default transport logger to the noop on drop, so a
165/// panicking assertion in a test that installs a logger cannot leak it into
166/// later tests. Declare it after acquiring [`TRANSPORT_LOGGER_TEST_LOCK`] so the
167/// reset runs while the lock is still held.
168#[cfg(test)]
169pub(crate) struct RestoreDefaultTransportLogger;
170
171#[cfg(test)]
172impl Drop for RestoreDefaultTransportLogger {
173    fn drop(&mut self) {
174        set_default_transport_logger(Arc::new(NoopTransportLogger));
175    }
176}
177
178#[derive(serde::Deserialize)]
179struct GraphQlError {
180    message: String,
181}
182
183#[derive(Default, serde::Deserialize)]
184struct GraphQlEnvelope {
185    data: Option<Value>,
186    #[serde(default)]
187    errors: Vec<GraphQlError>,
188}
189
190/// Structured debug event emitted by [`TransportLogger`].
191///
192/// `message` and `fields` are the stable breadcrumb surface (method, url,
193/// status, retry attempt). `headers` and `body` carry the raw, un-redacted
194/// request or response payload when one is available; loggers that print these
195/// (such as [`StderrTransportLogger`](super::StderrTransportLogger)) are
196/// responsible for redacting sensitive headers.
197#[derive(Clone, Debug, Default)]
198pub struct TransportLogEvent {
199    /// Event name such as `http request` or `retrying request`.
200    pub message: &'static str,
201    /// Stable event fields.
202    pub fields: BTreeMap<String, String>,
203    /// Raw header name/value pairs for the request or response, when known.
204    pub headers: Option<Vec<(String, String)>>,
205    /// Raw request or response body bytes, when captured. Streaming and
206    /// byte-download responses omit this and report a `body_bytes` field
207    /// instead to avoid buffering large payloads into the log.
208    pub body: Option<Vec<u8>>,
209}
210
211/// Debug logger interface for transport events.
212pub trait TransportLogger: Send + Sync + std::fmt::Debug {
213    /// Records one debug event.
214    fn debug(&self, event: &TransportLogEvent);
215
216    /// Whether this logger records anything.
217    ///
218    /// Defaults to `true`. The transport checks this before capturing request
219    /// and response headers/bodies, so a logger that returns `false` (such as
220    /// [`NoopTransportLogger`]) keeps the common non-debug path free of those
221    /// clones.
222    fn enabled(&self) -> bool {
223        true
224    }
225}
226
227/// Logger that intentionally drops transport events.
228#[derive(Clone, Debug, Default)]
229pub struct NoopTransportLogger;
230
231impl TransportLogger for NoopTransportLogger {
232    fn debug(&self, _event: &TransportLogEvent) {}
233
234    fn enabled(&self) -> bool {
235        false
236    }
237}
238
239/// Authenticated HTTP client for CLI command implementations.
240///
241/// The client covers the transport behavior command authors usually need: auth
242/// injection, JSON request/response helpers, structured HTTP errors,
243/// idempotent retries, ETag helpers, raw streaming helpers, multipart helpers,
244/// and GraphQL envelope decoding.
245#[derive(Clone, Debug)]
246pub struct HttpClient {
247    base: reqwest::Client,
248    base_url: String,
249    auth: Arc<dyn AuthInjector>,
250    user_agent: String,
251    default_headers: BTreeMap<String, String>,
252    logger: Arc<dyn TransportLogger>,
253}
254
255/// Builder for [`HttpClient`].
256#[derive(Clone, Debug)]
257pub struct HttpClientBuilder {
258    base_url: String,
259    auth: Arc<dyn AuthInjector>,
260    user_agent: String,
261    default_headers: BTreeMap<String, String>,
262    logger: Arc<dyn TransportLogger>,
263}
264
265impl HttpClientBuilder {
266    /// Creates a builder with a base URL and auth injector.
267    #[must_use]
268    pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
269        Self {
270            base_url: base_url.into(),
271            auth,
272            user_agent: default_user_agent(),
273            default_headers: BTreeMap::new(),
274            logger: default_transport_logger(),
275        }
276    }
277
278    /// Sets the user-agent for this client.
279    #[must_use]
280    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
281        self.user_agent = user_agent.into();
282        self
283    }
284
285    /// Alias for [`HttpClientBuilder::user_agent`] for migration readability.
286    #[must_use]
287    pub fn with_user_agent(self, user_agent: impl Into<String>) -> Self {
288        self.user_agent(user_agent)
289    }
290
291    /// Sets headers sent on every request.
292    #[must_use]
293    pub fn default_headers(mut self, headers: BTreeMap<String, String>) -> Self {
294        self.default_headers = headers;
295        self
296    }
297
298    /// Alias for [`HttpClientBuilder::default_headers`] for migration readability.
299    #[must_use]
300    pub fn with_default_headers(self, headers: BTreeMap<String, String>) -> Self {
301        self.default_headers(headers)
302    }
303
304    /// Sets the transport debug logger.
305    #[must_use]
306    pub fn logger(mut self, logger: Arc<dyn TransportLogger>) -> Self {
307        self.logger = logger;
308        self
309    }
310
311    /// Alias for [`HttpClientBuilder::logger`] for migration readability.
312    #[must_use]
313    pub fn with_logger(self, logger: Arc<dyn TransportLogger>) -> Self {
314        self.logger(logger)
315    }
316
317    /// Builds the client.
318    #[must_use]
319    pub fn build(self) -> HttpClient {
320        HttpClient {
321            base: reqwest::Client::new(),
322            base_url: self.base_url,
323            auth: self.auth,
324            user_agent: self.user_agent,
325            default_headers: self.default_headers,
326            logger: self.logger,
327        }
328    }
329}
330
331impl HttpClient {
332    /// Creates a client builder.
333    #[must_use]
334    pub fn builder(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> HttpClientBuilder {
335        HttpClientBuilder::new(base_url, auth)
336    }
337
338    /// Creates a client with default settings.
339    #[must_use]
340    pub fn new(base_url: impl Into<String>, auth: Arc<dyn AuthInjector>) -> Self {
341        HttpClientBuilder::new(base_url, auth).build()
342    }
343
344    /// Sends GET and decodes a JSON response.
345    pub async fn get<T: Default + DeserializeOwned>(&self, path: &str) -> Result<T> {
346        self.do_json(Method::GET, path, Option::<&()>::None).await
347    }
348
349    /// Sends GET and checks only for success.
350    pub async fn get_without_response(&self, path: &str) -> Result<()> {
351        self.do_empty(Method::GET, path, Option::<&()>::None).await
352    }
353
354    /// Sends POST with a JSON body and decodes a JSON response.
355    pub async fn post<B: Serialize, T: Default + DeserializeOwned>(
356        &self,
357        path: &str,
358        body: &B,
359    ) -> Result<T> {
360        self.do_json(Method::POST, path, Some(body)).await
361    }
362
363    /// Sends POST with a JSON body and checks only for success.
364    pub async fn post_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
365        self.do_empty(Method::POST, path, Some(body)).await
366    }
367
368    /// Sends PUT with a JSON body and decodes a JSON response.
369    pub async fn put<B: Serialize, T: Default + DeserializeOwned>(
370        &self,
371        path: &str,
372        body: &B,
373    ) -> Result<T> {
374        self.do_json(Method::PUT, path, Some(body)).await
375    }
376
377    /// Sends PUT with a JSON body and checks only for success.
378    pub async fn put_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
379        self.do_empty(Method::PUT, path, Some(body)).await
380    }
381
382    /// Sends PATCH with a JSON body and decodes a JSON response.
383    pub async fn patch<B: Serialize, T: Default + DeserializeOwned>(
384        &self,
385        path: &str,
386        body: &B,
387    ) -> Result<T> {
388        self.do_json(Method::PATCH, path, Some(body)).await
389    }
390
391    /// Sends PATCH with a JSON body and checks only for success.
392    pub async fn patch_without_response<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
393        self.do_empty(Method::PATCH, path, Some(body)).await
394    }
395
396    /// Sends DELETE and checks for success.
397    pub async fn delete(&self, path: &str) -> Result<()> {
398        self.do_empty(Method::DELETE, path, Option::<&()>::None)
399            .await
400    }
401
402    /// Sends DELETE with a JSON body and checks for success.
403    pub async fn delete_with_body<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
404        self.do_empty(Method::DELETE, path, Some(body)).await
405    }
406
407    /// Sends GET and returns decoded JSON plus the ETag header.
408    pub async fn get_etag<T: Default + DeserializeOwned>(&self, path: &str) -> Result<(T, String)> {
409        let response = self.send_get_status_only_retry(path).await?;
410        let etag = response
411            .headers()
412            .get(header::ETAG)
413            .and_then(|value| value.to_str().ok())
414            .unwrap_or_default()
415            .to_owned();
416        let value = self.decode_json_response(response, "GET", path).await?;
417        Ok((value, etag))
418    }
419
420    /// Sends GET and returns only the ETag header after checking success.
421    pub async fn get_etag_without_response(&self, path: &str) -> Result<String> {
422        let response = self.send_get_status_only_retry(path).await?;
423        let etag = response
424            .headers()
425            .get(header::ETAG)
426            .and_then(|value| value.to_str().ok())
427            .unwrap_or_default()
428            .to_owned();
429        self.ensure_success_response(response, "GET", path).await?;
430        Ok(etag)
431    }
432
433    /// Sends PUT with `If-Match` and decodes a JSON response.
434    pub async fn put_if_match<B: Serialize, T: Default + DeserializeOwned>(
435        &self,
436        path: &str,
437        body: &B,
438        etag: &str,
439    ) -> Result<T> {
440        let response = self.send_put_if_match(path, body, etag).await?;
441        self.decode_json_response(response, "PUT", path).await
442    }
443
444    /// Sends PUT with `If-Match` and checks only for success.
445    pub async fn put_if_match_without_response<B: Serialize>(
446        &self,
447        path: &str,
448        body: &B,
449        etag: &str,
450    ) -> Result<()> {
451        let response = self.send_put_if_match(path, body, etag).await?;
452        self.ensure_success_response(response, "PUT", path).await
453    }
454
455    /// Streams a raw GET response body into a writer.
456    pub async fn get_raw(&self, path: &str, writer: &mut dyn Write) -> Result<()> {
457        let response = self.send_get_raw_status_only_retry(path).await?;
458        let (status, bytes) = self
459            .read_and_log_response(response, "GET", path, false)
460            .await?;
461        if status.is_client_error() || status.is_server_error() {
462            return Err(
463                parse_error_body(status, &String::from_utf8_lossy(&bytes), "GET", path).into(),
464            );
465        }
466        writer.write_all(&bytes)?;
467        Ok(())
468    }
469
470    /// Sends GET and returns the raw response body as bytes.
471    pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
472        let response = self.send_get_raw_status_only_retry(path).await?;
473        let (status, bytes) = self
474            .read_and_log_response(response, "GET", path, false)
475            .await?;
476        if status.is_client_error() || status.is_server_error() {
477            return Err(
478                parse_error_body(status, &String::from_utf8_lossy(&bytes), "GET", path).into(),
479            );
480        }
481        Ok(bytes.to_vec())
482    }
483
484    /// Sends POST and streams the raw response body into a writer.
485    pub async fn post_raw<B: Serialize>(
486        &self,
487        path: &str,
488        body: Option<&B>,
489        writer: &mut dyn Write,
490    ) -> Result<()> {
491        let response = self.send_post_raw_once(path, body).await?;
492        let (status, bytes) = self
493            .read_and_log_response(response, "POST", path, false)
494            .await?;
495        if status.is_client_error() || status.is_server_error() {
496            return Err(
497                parse_error_body(status, &String::from_utf8_lossy(&bytes), "POST", path).into(),
498            );
499        }
500        writer.write_all(&bytes)?;
501        Ok(())
502    }
503
504    /// Sends a raw-body request and decodes a JSON response.
505    pub async fn do_raw<T: Default + DeserializeOwned>(
506        &self,
507        method: Method,
508        path: &str,
509        content_type: &str,
510        body: impl Into<Vec<u8>>,
511    ) -> Result<T> {
512        self.do_raw_optional_body(method, path, content_type, Some(body.into()))
513            .await
514    }
515
516    /// Sends an optional raw-body request and decodes a JSON response.
517    pub async fn do_raw_optional_body<T: Default + DeserializeOwned>(
518        &self,
519        method: Method,
520        path: &str,
521        content_type: &str,
522        body: Option<Vec<u8>>,
523    ) -> Result<T> {
524        let method_text = method.as_str().to_owned();
525        let response = self.send_raw_once(method, path, content_type, body).await?;
526        self.decode_json_response(response, &method_text, path)
527            .await
528    }
529
530    /// Sends a raw-body request and checks only for success.
531    pub async fn do_raw_without_response(
532        &self,
533        method: Method,
534        path: &str,
535        content_type: &str,
536        body: impl Into<Vec<u8>>,
537    ) -> Result<()> {
538        self.do_raw_optional_body_without_response(method, path, content_type, Some(body.into()))
539            .await
540    }
541
542    /// Sends an optional raw-body request and checks only for success.
543    pub async fn do_raw_optional_body_without_response(
544        &self,
545        method: Method,
546        path: &str,
547        content_type: &str,
548        body: Option<Vec<u8>>,
549    ) -> Result<()> {
550        let method_text = method.as_str().to_owned();
551        let response = self.send_raw_once(method, path, content_type, body).await?;
552        self.ensure_success_response(response, &method_text, path)
553            .await
554    }
555
556    /// Sends a multipart file upload and decodes a JSON response.
557    pub async fn post_multipart<T: Default + DeserializeOwned>(
558        &self,
559        path: &str,
560        field_name: &str,
561        file_path: &Path,
562    ) -> Result<T> {
563        self.post_multipart_with_fields(path, field_name, file_path, &BTreeMap::new())
564            .await
565    }
566
567    /// Sends a multipart file upload and checks only for success.
568    pub async fn post_multipart_without_response(
569        &self,
570        path: &str,
571        field_name: &str,
572        file_path: &Path,
573    ) -> Result<()> {
574        self.post_multipart_with_fields_without_response(
575            path,
576            field_name,
577            file_path,
578            &BTreeMap::new(),
579        )
580        .await
581    }
582
583    /// Sends a multipart file upload with fields and decodes a JSON response.
584    pub async fn post_multipart_with_fields<T: Default + DeserializeOwned>(
585        &self,
586        path: &str,
587        file_field: &str,
588        file_path: &Path,
589        fields: &BTreeMap<String, String>,
590    ) -> Result<T> {
591        let form = self.multipart_form(file_field, file_path, fields).await?;
592        self.send_multipart(path, form).await
593    }
594
595    async fn multipart_form(
596        &self,
597        file_field: &str,
598        file_path: &Path,
599        fields: &BTreeMap<String, String>,
600    ) -> Result<reqwest::multipart::Form> {
601        let mut form = reqwest::multipart::Form::new();
602        for (key, value) in fields {
603            form = form.text(key.clone(), value.clone());
604        }
605        let file_name = file_path
606            .file_name()
607            .and_then(|name| name.to_str())
608            .unwrap_or("file")
609            .to_owned();
610        let bytes = tokio::fs::read(file_path)
611            .await
612            .map_err(|err| CliCoreError::message(format!("transport: open file: {err}")))?;
613        let part = reqwest::multipart::Part::bytes(bytes).file_name(file_name);
614        form = form.part(file_field.to_owned(), part);
615        Ok(form)
616    }
617
618    /// Sends a multipart file upload with fields and checks only for success.
619    pub async fn post_multipart_with_fields_without_response(
620        &self,
621        path: &str,
622        file_field: &str,
623        file_path: &Path,
624        fields: &BTreeMap<String, String>,
625    ) -> Result<()> {
626        let form = self.multipart_form(file_field, file_path, fields).await?;
627        self.send_multipart_without_response(path, form).await
628    }
629
630    /// Sends multipart form fields without a file and decodes a JSON response.
631    pub async fn post_multipart_fields<T: Default + DeserializeOwned>(
632        &self,
633        path: &str,
634        fields: &BTreeMap<String, String>,
635    ) -> Result<T> {
636        let mut form = reqwest::multipart::Form::new();
637        for (key, value) in fields {
638            form = form.text(key.clone(), value.clone());
639        }
640        self.send_multipart(path, form).await
641    }
642
643    /// Sends multipart form fields without a file and checks only for success.
644    pub async fn post_multipart_fields_without_response(
645        &self,
646        path: &str,
647        fields: &BTreeMap<String, String>,
648    ) -> Result<()> {
649        let mut form = reqwest::multipart::Form::new();
650        for (key, value) in fields {
651            form = form.text(key.clone(), value.clone());
652        }
653        self.send_multipart_without_response(path, form).await
654    }
655
656    /// Sends a GraphQL request and decodes the `data` envelope into a value.
657    pub async fn post_graphql<T: DeserializeOwned + Default>(
658        &self,
659        path: &str,
660        query: &str,
661        variables: BTreeMap<String, Value>,
662    ) -> Result<T> {
663        self.post_graphql_optional_variables(path, query, Some(variables))
664            .await
665    }
666
667    /// Sends a GraphQL request with optional variables and decodes `data`.
668    pub async fn post_graphql_optional_variables<T: DeserializeOwned + Default>(
669        &self,
670        path: &str,
671        query: &str,
672        variables: Option<BTreeMap<String, Value>>,
673    ) -> Result<T> {
674        let mut result = T::default();
675        self.post_graphql_optional_variables_into(path, query, variables, &mut result)
676            .await?;
677        Ok(result)
678    }
679
680    /// Sends a GraphQL request and checks only for GraphQL/HTTP success.
681    pub async fn post_graphql_without_response(
682        &self,
683        path: &str,
684        query: &str,
685        variables: BTreeMap<String, Value>,
686    ) -> Result<()> {
687        self.post_graphql_optional_variables_without_response(path, query, Some(variables))
688            .await
689    }
690
691    /// Sends a GraphQL request with optional variables and checks only for success.
692    pub async fn post_graphql_optional_variables_without_response(
693        &self,
694        path: &str,
695        query: &str,
696        variables: Option<BTreeMap<String, Value>>,
697    ) -> Result<()> {
698        self.post_graphql_response_envelope(path, query, variables)
699            .await?;
700        Ok(())
701    }
702
703    /// Sends a GraphQL request and decodes `data` into an existing value.
704    pub async fn post_graphql_into<T: DeserializeOwned>(
705        &self,
706        path: &str,
707        query: &str,
708        variables: BTreeMap<String, Value>,
709        result: &mut T,
710    ) -> Result<()> {
711        self.post_graphql_optional_variables_into(path, query, Some(variables), result)
712            .await
713    }
714
715    /// Sends a GraphQL request with optional variables and decodes into an existing value.
716    pub async fn post_graphql_optional_variables_into<T: DeserializeOwned>(
717        &self,
718        path: &str,
719        query: &str,
720        variables: Option<BTreeMap<String, Value>>,
721        result: &mut T,
722    ) -> Result<()> {
723        let envelope = self
724            .post_graphql_response_envelope(path, query, variables)
725            .await?;
726        if let Some(data) = envelope.data
727            && !data.is_null()
728        {
729            *result = serde_json::from_value(data).map_err(|err| {
730                CliCoreError::message(format!("transport: decode graphql data: {err}"))
731            })?;
732        }
733        Ok(())
734    }
735
736    async fn do_json<B: Serialize, T: Default + DeserializeOwned>(
737        &self,
738        method: Method,
739        path: &str,
740        body: Option<&B>,
741    ) -> Result<T> {
742        let method_text = method.as_str().to_owned();
743        let response = self.send_with_retry(method, path, body).await?;
744        self.decode_json_response(response, &method_text, path)
745            .await
746    }
747
748    async fn post_graphql_response_envelope(
749        &self,
750        path: &str,
751        query: &str,
752        variables: Option<BTreeMap<String, Value>>,
753    ) -> Result<GraphQlEnvelope> {
754        #[derive(Serialize)]
755        struct Request<'query> {
756            query: &'query str,
757            variables: Option<BTreeMap<String, Value>>,
758        }
759
760        let envelope: GraphQlEnvelope = self.post(path, &Request { query, variables }).await?;
761        if !envelope.errors.is_empty() {
762            let message = envelope
763                .errors
764                .iter()
765                .map(|error| error.message.as_str())
766                .collect::<Vec<_>>()
767                .join("; ");
768            return Err(CliCoreError::message(format!("graphql: {message}")));
769        }
770        Ok(envelope)
771    }
772
773    async fn send_put_if_match<B: Serialize>(
774        &self,
775        path: &str,
776        body: &B,
777        etag: &str,
778    ) -> Result<reqwest::Response> {
779        let mut request = self
780            .build_request(Method::PUT, path, Some(body))?
781            .header(header::IF_MATCH, etag)
782            .build()
783            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
784        self.inject_auth(&mut request).await?;
785        self.log_request(&request);
786        self.base
787            .execute(request)
788            .await
789            .map_err(|err| CliCoreError::message(format!("transport: PUT {path}: {err}")))
790    }
791
792    async fn send_multipart<T: Default + DeserializeOwned>(
793        &self,
794        path: &str,
795        form: reqwest::multipart::Form,
796    ) -> Result<T> {
797        let response = self.send_multipart_response(path, form).await?;
798        self.decode_json_response(response, "POST", path).await
799    }
800
801    async fn send_multipart_without_response(
802        &self,
803        path: &str,
804        form: reqwest::multipart::Form,
805    ) -> Result<()> {
806        let response = self.send_multipart_response(path, form).await?;
807        self.ensure_success_response(response, "POST", path).await
808    }
809
810    async fn send_multipart_response(
811        &self,
812        path: &str,
813        form: reqwest::multipart::Form,
814    ) -> Result<reqwest::Response> {
815        let url = format!("{}{}", self.base_url, path);
816        let mut builder = self
817            .base
818            .post(url)
819            .header(header::USER_AGENT, self.user_agent.clone())
820            .multipart(form);
821        for (key, value) in &self.default_headers {
822            builder = builder.header(key, value);
823        }
824        let mut request = builder
825            .build()
826            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
827        self.inject_auth(&mut request).await?;
828        self.log_request(&request);
829        self.base
830            .execute(request)
831            .await
832            .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
833    }
834
835    async fn do_empty<B: Serialize>(
836        &self,
837        method: Method,
838        path: &str,
839        body: Option<&B>,
840    ) -> Result<()> {
841        let method_text = method.as_str().to_owned();
842        let response = self.send_with_retry(method, path, body).await?;
843        self.ensure_success_response(response, &method_text, path)
844            .await
845    }
846
847    async fn send_raw_once(
848        &self,
849        method: Method,
850        path: &str,
851        content_type: &str,
852        body: Option<Vec<u8>>,
853    ) -> Result<reqwest::Response> {
854        let url = format!("{}{}", self.base_url, path);
855        let method_text = method.as_str().to_owned();
856        let mut builder = self
857            .base
858            .request(method, url)
859            .header(header::USER_AGENT, self.user_agent.clone());
860        if let Some(body) = body {
861            builder = builder.body(body);
862        }
863        if !content_type.is_empty() {
864            builder = builder.header(header::CONTENT_TYPE, content_type);
865        }
866        for (key, value) in &self.default_headers {
867            builder = builder.header(key, value);
868        }
869        let mut request = builder
870            .build()
871            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
872        self.inject_auth(&mut request).await?;
873        self.log_request(&request);
874        self.base
875            .execute(request)
876            .await
877            .map_err(|err| CliCoreError::message(format!("transport: {method_text} {path}: {err}")))
878    }
879
880    async fn send_get_raw_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
881        let mut last_err = None;
882        for attempt in 0..MAX_RETRIES {
883            if attempt > 0 {
884                let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
885                time::sleep(backoff).await;
886            }
887
888            match self.send_get_raw_once(path).await {
889                Ok(response) => {
890                    let status = response.status();
891                    if status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() {
892                        self.log_response("GET", path, status, response.headers(), None, None);
893                        last_err = Some(CliCoreError::message(format!(
894                            "transport: GET {}: status {}",
895                            path,
896                            status.as_u16()
897                        )));
898                        continue;
899                    }
900                    return Ok(response);
901                }
902                Err(err) => last_err = Some(err),
903            }
904        }
905        Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
906    }
907
908    async fn send_get_raw_once(&self, path: &str) -> Result<reqwest::Response> {
909        let url = format!("{}{}", self.base_url, path);
910        let mut builder = self
911            .base
912            .get(url)
913            .header(header::USER_AGENT, self.user_agent.clone());
914        for (key, value) in &self.default_headers {
915            builder = builder.header(key, value);
916        }
917        let mut request = builder
918            .build()
919            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
920        self.inject_auth(&mut request).await?;
921        self.log_request(&request);
922        self.base
923            .execute(request)
924            .await
925            .map_err(|err| CliCoreError::message(format!("transport: GET {path}: {err}")))
926    }
927
928    async fn send_post_raw_once<B: Serialize>(
929        &self,
930        path: &str,
931        body: Option<&B>,
932    ) -> Result<reqwest::Response> {
933        let mut request = self
934            .build_request(Method::POST, path, body)?
935            .build()
936            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
937        self.inject_auth(&mut request).await?;
938        self.log_request(&request);
939        self.base
940            .execute(request)
941            .await
942            .map_err(|err| CliCoreError::message(format!("transport: POST {path}: {err}")))
943    }
944
945    async fn send_with_retry<B: Serialize>(
946        &self,
947        method: Method,
948        path: &str,
949        body: Option<&B>,
950    ) -> Result<reqwest::Response> {
951        let mut last_err = None;
952        for attempt in 0..MAX_RETRIES {
953            if attempt > 0 {
954                let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
955                self.log_debug(
956                    "retrying request",
957                    [
958                        ("attempt", (attempt + 1).to_string()),
959                        ("backoff", format!("{backoff:?}")),
960                    ],
961                );
962                time::sleep(backoff).await;
963            }
964
965            match self.send_once(method.clone(), path, body).await {
966                Ok(response) => {
967                    if retryable_status(method.clone(), response.status()) {
968                        last_err = Some(
969                            self.retryable_status_error(response, method.as_str(), path)
970                                .await,
971                        );
972                        continue;
973                    }
974                    return Ok(response);
975                }
976                Err(err) if is_idempotent(&method) => {
977                    last_err = Some(err);
978                }
979                Err(err) => return Err(err),
980            }
981        }
982        Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
983    }
984
985    async fn send_get_status_only_retry(&self, path: &str) -> Result<reqwest::Response> {
986        let mut last_err = None;
987        for attempt in 0..MAX_RETRIES {
988            if attempt > 0 {
989                let backoff = BASE_BACKOFF * 2_u32.pow(u32::try_from(attempt - 1).unwrap_or(0));
990                self.log_debug(
991                    "retrying request",
992                    [
993                        ("attempt", (attempt + 1).to_string()),
994                        ("backoff", format!("{backoff:?}")),
995                    ],
996                );
997                time::sleep(backoff).await;
998            }
999
1000            match self.send_once(Method::GET, path, Option::<&()>::None).await {
1001                Ok(response) => {
1002                    let status = response.status();
1003                    if status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() {
1004                        self.log_response("GET", path, status, response.headers(), None, None);
1005                        last_err = Some(CliCoreError::message(format!(
1006                            "transport: GET {}: status {}",
1007                            path,
1008                            status.as_u16()
1009                        )));
1010                        continue;
1011                    }
1012                    return Ok(response);
1013                }
1014                Err(err) => last_err = Some(err),
1015            }
1016        }
1017        Err(last_err.unwrap_or_else(|| CliCoreError::message("transport: retry failed")))
1018    }
1019
1020    async fn send_once<B: Serialize>(
1021        &self,
1022        method: Method,
1023        path: &str,
1024        body: Option<&B>,
1025    ) -> Result<reqwest::Response> {
1026        let mut request = self
1027            .build_request(method.clone(), path, body)?
1028            .build()
1029            .map_err(|err| CliCoreError::message(format!("transport: create request: {err}")))?;
1030        self.inject_auth(&mut request).await?;
1031        let method_text = method.as_str().to_owned();
1032        self.log_request(&request);
1033        self.base
1034            .execute(request)
1035            .await
1036            .map_err(|err| CliCoreError::message(format!("transport: {method_text} {path}: {err}")))
1037    }
1038
1039    fn build_request<B: Serialize>(
1040        &self,
1041        method: Method,
1042        path: &str,
1043        body: Option<&B>,
1044    ) -> Result<reqwest::RequestBuilder> {
1045        let url = format!("{}{}", self.base_url, path);
1046        let mut builder = self
1047            .base
1048            .request(method, url)
1049            .header(header::USER_AGENT, self.user_agent.clone());
1050        if let Some(body) = body {
1051            let body = serde_json::to_vec(body)
1052                .map_err(|err| CliCoreError::message(format!("transport: marshal body: {err}")))?;
1053            builder = builder
1054                .header(header::CONTENT_TYPE, "application/json")
1055                .body(body);
1056        }
1057        for (key, value) in &self.default_headers {
1058            builder = builder.header(key, value);
1059        }
1060        Ok(builder)
1061    }
1062
1063    fn log_debug(
1064        &self,
1065        message: &'static str,
1066        fields: impl IntoIterator<Item = (&'static str, String)>,
1067    ) {
1068        if !self.logger.enabled() {
1069            return;
1070        }
1071        self.logger.debug(&TransportLogEvent {
1072            message,
1073            fields: fields
1074                .into_iter()
1075                .map(|(key, value)| (key.to_owned(), value))
1076                .collect(),
1077            headers: None,
1078            body: None,
1079        });
1080    }
1081
1082    /// Emits an `http request` event capturing the built request's headers and
1083    /// in-memory body. Streaming bodies (e.g. multipart) report no body.
1084    ///
1085    /// Skips capture entirely when the logger is disabled, so the non-debug path
1086    /// does not clone headers or copy request bodies.
1087    fn log_request(&self, request: &reqwest::Request) {
1088        if !self.logger.enabled() {
1089            return;
1090        }
1091        self.logger.debug(&TransportLogEvent {
1092            message: "http request",
1093            fields: BTreeMap::from([
1094                ("method".to_owned(), request.method().as_str().to_owned()),
1095                ("url".to_owned(), request.url().as_str().to_owned()),
1096            ]),
1097            headers: Some(header_pairs(request.headers())),
1098            body: request
1099                .body()
1100                .and_then(reqwest::Body::as_bytes)
1101                .map(<[u8]>::to_vec),
1102        });
1103    }
1104
1105    /// Emits an `http response` event. When `body` is `None`, `body_bytes`
1106    /// records the payload size instead (used for raw/byte-download paths so
1107    /// large responses are not buffered into the log).
1108    fn log_response(
1109        &self,
1110        method: &str,
1111        path: &str,
1112        status: StatusCode,
1113        headers: &header::HeaderMap,
1114        body: Option<&[u8]>,
1115        body_bytes: Option<usize>,
1116    ) {
1117        if !self.logger.enabled() {
1118            return;
1119        }
1120        let mut fields = BTreeMap::from([
1121            ("status".to_owned(), status.as_u16().to_string()),
1122            ("method".to_owned(), method.to_owned()),
1123            ("url".to_owned(), format!("{}{}", self.base_url, path)),
1124        ]);
1125        if let Some(len) = body_bytes {
1126            fields.insert("body_bytes".to_owned(), len.to_string());
1127        }
1128        self.logger.debug(&TransportLogEvent {
1129            message: "http response",
1130            fields,
1131            headers: Some(header_pairs(headers)),
1132            body: body.map(<[u8]>::to_vec),
1133        });
1134    }
1135
1136    /// Reads a response body once, emits the `http response` event, and returns
1137    /// the status and buffered bytes. `include_body` controls whether the body
1138    /// is attached to the log or only its size is reported.
1139    ///
1140    /// Returns the body as [`Bytes`] (a cheap clone of the buffer `reqwest`
1141    /// already owns) so callers decode without an extra copy. When the logger is
1142    /// disabled, response headers are not cloned and no event is built.
1143    async fn read_and_log_response(
1144        &self,
1145        response: reqwest::Response,
1146        method: &str,
1147        path: &str,
1148        include_body: bool,
1149    ) -> Result<(StatusCode, Bytes)> {
1150        let status = response.status();
1151        let logging = self.logger.enabled();
1152        let headers = logging.then(|| response.headers().clone());
1153        let body = response.bytes().await.map_err(|err| {
1154            CliCoreError::message(format!("transport: read response body: {err}"))
1155        })?;
1156        if let Some(headers) = headers {
1157            if include_body {
1158                self.log_response(method, path, status, &headers, Some(&body), None);
1159            } else {
1160                self.log_response(method, path, status, &headers, None, Some(body.len()));
1161            }
1162        }
1163        Ok((status, body))
1164    }
1165
1166    async fn inject_auth(&self, request: &mut reqwest::Request) -> Result<()> {
1167        self.auth
1168            .inject(request)
1169            .await
1170            .map_err(|err| CliCoreError::message(format!("transport: auth inject: {err}")))
1171    }
1172
1173    async fn decode_json_response<T: Default + DeserializeOwned>(
1174        &self,
1175        response: reqwest::Response,
1176        method: &str,
1177        path: &str,
1178    ) -> Result<T> {
1179        let (status, body) = self
1180            .read_and_log_response(response, method, path, true)
1181            .await?;
1182        if status.is_client_error() || status.is_server_error() {
1183            return Err(
1184                parse_error_body(status, &String::from_utf8_lossy(&body), method, path).into(),
1185            );
1186        }
1187        if status == StatusCode::NO_CONTENT {
1188            return Ok(T::default());
1189        }
1190        if body.trim_ascii() == b"null" {
1191            return Ok(T::default());
1192        }
1193        serde_json::from_slice::<T>(&body)
1194            .map_err(|err| CliCoreError::message(format!("transport: decode response: {err}")))
1195    }
1196
1197    async fn ensure_success_response(
1198        &self,
1199        response: reqwest::Response,
1200        method: &str,
1201        path: &str,
1202    ) -> Result<()> {
1203        // A `*_without_response` call discards the body, so the body is only
1204        // needed to build an error message or to feed the logger. When neither
1205        // applies (non-error status, logging disabled), skip buffering it —
1206        // matching the pre-logging behavior of not reading success bodies.
1207        let is_error = response.status().is_client_error() || response.status().is_server_error();
1208        if !is_error && !self.logger.enabled() {
1209            return Ok(());
1210        }
1211        let (status, body) = self
1212            .read_and_log_response(response, method, path, true)
1213            .await?;
1214        if status.is_client_error() || status.is_server_error() {
1215            return Err(
1216                parse_error_body(status, &String::from_utf8_lossy(&body), method, path).into(),
1217            );
1218        }
1219        Ok(())
1220    }
1221
1222    async fn retryable_status_error(
1223        &self,
1224        response: reqwest::Response,
1225        method: &str,
1226        path: &str,
1227    ) -> CliCoreError {
1228        let status = response.status();
1229        let headers = self.logger.enabled().then(|| response.headers().clone());
1230        match response.bytes().await {
1231            Ok(body) => {
1232                if let Some(headers) = &headers {
1233                    self.log_response(method, path, status, headers, Some(&body), None);
1234                }
1235                CliCoreError::message(format!(
1236                    "transport: {method} {path}: status {}: {}",
1237                    status.as_u16(),
1238                    String::from_utf8_lossy(&body)
1239                ))
1240            }
1241            Err(err) => {
1242                if let Some(headers) = &headers {
1243                    self.log_response(method, path, status, headers, None, None);
1244                }
1245                CliCoreError::message(format!(
1246                    "transport: {method} {path}: status {} (body read failed: {err})",
1247                    status.as_u16()
1248                ))
1249            }
1250        }
1251    }
1252}
1253
1254/// Converts a `reqwest` header map into owned name/value pairs for logging.
1255///
1256/// Header values that are not valid UTF-8 are rendered as a byte-count
1257/// placeholder rather than dropped, so the trace still shows the header exists.
1258fn header_pairs(headers: &header::HeaderMap) -> Vec<(String, String)> {
1259    headers
1260        .iter()
1261        .map(|(name, value)| {
1262            let value = value.to_str().map_or_else(
1263                |_| format!("<{} non-utf8 bytes>", value.as_bytes().len()),
1264                str::to_owned,
1265            );
1266            (name.as_str().to_owned(), value)
1267        })
1268        .collect()
1269}
1270
1271/// Converts a non-success HTTP response into the shared transport error shape.
1272///
1273/// If the response body already contains an API-style error document, the
1274/// service message is preserved and the HTTP status is normalized into the
1275/// error code. Otherwise the method, path, status, and response body are folded
1276/// into a readable fallback message.
1277pub async fn parse_error_response(response: reqwest::Response, method: &str, path: &str) -> Error {
1278    let status = response.status();
1279    let body = response.text().await.unwrap_or_default();
1280    parse_error_body(status, &body, method, path)
1281}
1282
1283fn parse_error_body(status: StatusCode, body: &str, method: &str, path: &str) -> Error {
1284    if let Ok(mut api_error) = serde_json::from_str::<Error>(body)
1285        && !api_error.message.is_empty()
1286    {
1287        api_error.code = format!("HTTP_{}", status.as_u16());
1288        return api_error;
1289    }
1290    Error {
1291        code: format!("HTTP_{}", status.as_u16()),
1292        message: format!("{} {}: {} {}", method, path, status.as_u16(), body),
1293        system: String::new(),
1294        request_id: String::new(),
1295    }
1296}
1297
1298fn retryable_status(method: Method, status: StatusCode) -> bool {
1299    status == StatusCode::TOO_MANY_REQUESTS || (status.is_server_error() && is_idempotent(&method))
1300}
1301
1302fn is_idempotent(method: &Method) -> bool {
1303    matches!(*method, Method::GET | Method::HEAD | Method::DELETE)
1304}