1use std::time::Duration;
2
3use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
4use serde::de::DeserializeOwned;
5use serde::Serialize;
6use tracing::{debug, warn};
7use url::Url;
8
9use crate::error::{ApiError, Result};
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
12const MAX_RETRIES: u32 = 3;
13
14pub struct ClientBuilder {
15 api_key: String,
16 app_key: String,
17 site: String,
18 user_agent: String,
19 timeout: Duration,
20}
21
22impl ClientBuilder {
23 pub fn new(api_key: impl Into<String>, app_key: impl Into<String>) -> Self {
24 Self {
25 api_key: api_key.into(),
26 app_key: app_key.into(),
27 site: "datadoghq.com".to_string(),
28 user_agent: format!("ddog-cli/{}", env!("CARGO_PKG_VERSION")),
29 timeout: DEFAULT_TIMEOUT,
30 }
31 }
32
33 pub fn site(mut self, site: impl Into<String>) -> Self {
34 self.site = site.into();
35 self
36 }
37
38 pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
39 self.user_agent = ua.into();
40 self
41 }
42
43 pub fn timeout(mut self, timeout: Duration) -> Self {
44 self.timeout = timeout;
45 self
46 }
47
48 pub fn build(self) -> Result<Client> {
49 let mut headers = HeaderMap::new();
50 headers.insert(
51 HeaderName::from_static("dd-api-key"),
52 HeaderValue::from_str(&self.api_key)
53 .map_err(|_| ApiError::Upstream { status: 0, body: "invalid API key header".into() })?,
54 );
55 headers.insert(
56 HeaderName::from_static("dd-application-key"),
57 HeaderValue::from_str(&self.app_key)
58 .map_err(|_| ApiError::Upstream { status: 0, body: "invalid App key header".into() })?,
59 );
60
61 let http = reqwest::Client::builder()
62 .default_headers(headers)
63 .user_agent(self.user_agent)
64 .timeout(self.timeout)
65 .build()?;
66
67 let base = Url::parse(&format!("https://api.{}/", self.site))?;
68 Ok(Client { http, base })
69 }
70}
71
72pub struct Client {
73 http: reqwest::Client,
74 base: Url,
75}
76
77impl Client {
78 pub fn base_url(&self) -> &Url {
79 &self.base
80 }
81
82 fn url(&self, path: &str) -> Result<Url> {
83 Ok(self.base.join(path.trim_start_matches('/'))?)
84 }
85
86 pub async fn post_json<B, R>(&self, path: &str, body: &B) -> Result<R>
87 where
88 B: Serialize + ?Sized,
89 R: DeserializeOwned,
90 {
91 let url = self.url(path)?;
92 self.execute_with_retry(|| {
93 self.http.post(url.clone()).json(body).send()
94 })
95 .await
96 }
97
98 pub async fn get_json<R>(&self, path: &str, query: &[(&str, String)]) -> Result<R>
99 where
100 R: DeserializeOwned,
101 {
102 let url = self.url(path)?;
103 self.execute_with_retry(|| {
104 let mut req = self.http.get(url.clone());
105 if !query.is_empty() {
106 req = req.query(query);
107 }
108 req.send()
109 })
110 .await
111 }
112
113 async fn execute_with_retry<F, Fut, R>(&self, mut send: F) -> Result<R>
114 where
115 F: FnMut() -> Fut,
116 Fut: std::future::Future<Output = std::result::Result<reqwest::Response, reqwest::Error>>,
117 R: DeserializeOwned,
118 {
119 let mut attempt = 0u32;
120 loop {
121 attempt += 1;
122 let res = send().await;
123 let resp = match res {
124 Ok(r) => r,
125 Err(e) => {
126 if attempt < MAX_RETRIES && (e.is_timeout() || e.is_connect()) {
127 let backoff = backoff_for(attempt);
128 warn!(attempt, ?backoff, error=%e, "transient network error; retrying");
129 tokio::time::sleep(backoff).await;
130 continue;
131 }
132 return Err(ApiError::from(e));
133 }
134 };
135
136 let status = resp.status();
137 if status.is_success() {
138 let text = resp.text().await?;
139 debug!(bytes = text.len(), "response body");
140 return Ok(serde_json::from_str::<R>(&text)?);
141 }
142
143 let retry_after = resp
144 .headers()
145 .get("retry-after")
146 .and_then(|v| v.to_str().ok())
147 .and_then(|v| v.parse::<u64>().ok());
148
149 let code = status.as_u16();
150 let body = resp.text().await.unwrap_or_default();
151
152 match code {
153 401 | 403 => return Err(ApiError::Auth),
154 404 => return Err(ApiError::NotFound(body)),
155 429 => {
156 if attempt < MAX_RETRIES {
157 let wait = retry_after.map(Duration::from_secs).unwrap_or(backoff_for(attempt));
158 warn!(attempt, ?wait, "rate limited; retrying");
159 tokio::time::sleep(wait).await;
160 continue;
161 }
162 return Err(ApiError::RateLimited {
163 retry_after_secs: retry_after.unwrap_or(0),
164 });
165 }
166 500..=599 => {
167 if attempt < MAX_RETRIES {
168 let wait = backoff_for(attempt);
169 warn!(attempt, status=code, ?wait, "upstream 5xx; retrying");
170 tokio::time::sleep(wait).await;
171 continue;
172 }
173 return Err(ApiError::Upstream { status: code, body });
174 }
175 _ => return Err(ApiError::Upstream { status: code, body }),
176 }
177 }
178 }
179}
180
181fn backoff_for(attempt: u32) -> Duration {
182 Duration::from_millis(500u64 * 2u64.pow(attempt.saturating_sub(1)))
184}