Skip to main content

dd_api/
client.rs

1use std::time::Duration;
2
3use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
4use serde::de::DeserializeOwned;
5use serde::Serialize;
6use tracing::{debug, warn};
7use url::Url;
8
9use crate::error::{ApiError, Result};
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
12const MAX_RETRIES: u32 = 3;
13
14pub struct ClientBuilder {
15    api_key: String,
16    app_key: String,
17    site: String,
18    user_agent: String,
19    timeout: Duration,
20}
21
22impl ClientBuilder {
23    pub fn new(api_key: impl Into<String>, app_key: impl Into<String>) -> Self {
24        Self {
25            api_key: api_key.into(),
26            app_key: app_key.into(),
27            site: "datadoghq.com".to_string(),
28            user_agent: format!("ddog-cli/{}", env!("CARGO_PKG_VERSION")),
29            timeout: DEFAULT_TIMEOUT,
30        }
31    }
32
33    pub fn site(mut self, site: impl Into<String>) -> Self {
34        self.site = site.into();
35        self
36    }
37
38    pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
39        self.user_agent = ua.into();
40        self
41    }
42
43    pub fn timeout(mut self, timeout: Duration) -> Self {
44        self.timeout = timeout;
45        self
46    }
47
48    pub fn build(self) -> Result<Client> {
49        let mut headers = HeaderMap::new();
50        headers.insert(
51            HeaderName::from_static("dd-api-key"),
52            HeaderValue::from_str(&self.api_key)
53                .map_err(|_| ApiError::Upstream { status: 0, body: "invalid API key header".into() })?,
54        );
55        headers.insert(
56            HeaderName::from_static("dd-application-key"),
57            HeaderValue::from_str(&self.app_key)
58                .map_err(|_| ApiError::Upstream { status: 0, body: "invalid App key header".into() })?,
59        );
60
61        let http = reqwest::Client::builder()
62            .default_headers(headers)
63            .user_agent(self.user_agent)
64            .timeout(self.timeout)
65            .build()?;
66
67        let base = Url::parse(&format!("https://api.{}/", self.site))?;
68        Ok(Client { http, base })
69    }
70}
71
72pub struct Client {
73    http: reqwest::Client,
74    base: Url,
75}
76
77impl Client {
78    pub fn base_url(&self) -> &Url {
79        &self.base
80    }
81
82    fn url(&self, path: &str) -> Result<Url> {
83        Ok(self.base.join(path.trim_start_matches('/'))?)
84    }
85
86    pub async fn post_json<B, R>(&self, path: &str, body: &B) -> Result<R>
87    where
88        B: Serialize + ?Sized,
89        R: DeserializeOwned,
90    {
91        let url = self.url(path)?;
92        self.execute_with_retry(|| {
93            self.http.post(url.clone()).json(body).send()
94        })
95        .await
96    }
97
98    pub async fn get_json<R>(&self, path: &str, query: &[(&str, String)]) -> Result<R>
99    where
100        R: DeserializeOwned,
101    {
102        let url = self.url(path)?;
103        self.execute_with_retry(|| {
104            let mut req = self.http.get(url.clone());
105            if !query.is_empty() {
106                req = req.query(query);
107            }
108            req.send()
109        })
110        .await
111    }
112
113    async fn execute_with_retry<F, Fut, R>(&self, mut send: F) -> Result<R>
114    where
115        F: FnMut() -> Fut,
116        Fut: std::future::Future<Output = std::result::Result<reqwest::Response, reqwest::Error>>,
117        R: DeserializeOwned,
118    {
119        let mut attempt = 0u32;
120        loop {
121            attempt += 1;
122            let res = send().await;
123            let resp = match res {
124                Ok(r) => r,
125                Err(e) => {
126                    if attempt < MAX_RETRIES && (e.is_timeout() || e.is_connect()) {
127                        let backoff = backoff_for(attempt);
128                        warn!(attempt, ?backoff, error=%e, "transient network error; retrying");
129                        tokio::time::sleep(backoff).await;
130                        continue;
131                    }
132                    return Err(ApiError::from(e));
133                }
134            };
135
136            let status = resp.status();
137            if status.is_success() {
138                let text = resp.text().await?;
139                debug!(bytes = text.len(), "response body");
140                return Ok(serde_json::from_str::<R>(&text)?);
141            }
142
143            let retry_after = resp
144                .headers()
145                .get("retry-after")
146                .and_then(|v| v.to_str().ok())
147                .and_then(|v| v.parse::<u64>().ok());
148
149            let code = status.as_u16();
150            let body = resp.text().await.unwrap_or_default();
151
152            match code {
153                401 | 403 => return Err(ApiError::Auth),
154                404 => return Err(ApiError::NotFound(body)),
155                429 => {
156                    if attempt < MAX_RETRIES {
157                        let wait = retry_after.map(Duration::from_secs).unwrap_or(backoff_for(attempt));
158                        warn!(attempt, ?wait, "rate limited; retrying");
159                        tokio::time::sleep(wait).await;
160                        continue;
161                    }
162                    return Err(ApiError::RateLimited {
163                        retry_after_secs: retry_after.unwrap_or(0),
164                    });
165                }
166                500..=599 => {
167                    if attempt < MAX_RETRIES {
168                        let wait = backoff_for(attempt);
169                        warn!(attempt, status=code, ?wait, "upstream 5xx; retrying");
170                        tokio::time::sleep(wait).await;
171                        continue;
172                    }
173                    return Err(ApiError::Upstream { status: code, body });
174                }
175                _ => return Err(ApiError::Upstream { status: code, body }),
176            }
177        }
178    }
179}
180
181fn backoff_for(attempt: u32) -> Duration {
182    // 500ms, 1s, 2s
183    Duration::from_millis(500u64 * 2u64.pow(attempt.saturating_sub(1)))
184}