1use std::time::Duration;
8
9use anyhow::{Context, Result};
10use reqwest::Client;
11
12use crate::datadog::auth::{base_url_for_site, DatadogCredentials};
13use crate::datadog::error::DatadogError;
14
15const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
17
18const MAX_RETRIES: u32 = 3;
20
21const DEFAULT_RETRY_DELAY_SECS: u64 = 2;
24
25#[derive(Debug)]
27pub struct DatadogClient {
28 client: Client,
29 base_url: String,
30 api_key: String,
31 app_key: String,
32}
33
34impl DatadogClient {
35 pub fn new(base_url: &str, api_key: &str, app_key: &str) -> Result<Self> {
41 let client = Client::builder()
42 .timeout(REQUEST_TIMEOUT)
43 .build()
44 .context("Failed to build HTTP client")?;
45
46 Ok(Self {
47 client,
48 base_url: base_url.trim_end_matches('/').to_string(),
49 api_key: api_key.to_string(),
50 app_key: app_key.to_string(),
51 })
52 }
53
54 pub fn from_credentials(creds: &DatadogCredentials) -> Result<Self> {
60 let base_url = std::env::var(crate::datadog::auth::DATADOG_API_URL)
61 .ok()
62 .filter(|s| !s.is_empty())
63 .unwrap_or_else(|| base_url_for_site(&creds.site));
64 Self::new(&base_url, &creds.api_key, &creds.app_key)
65 }
66
67 #[must_use]
69 pub fn base_url(&self) -> &str {
70 &self.base_url
71 }
72
73 pub async fn get_json(&self, url: &str) -> Result<reqwest::Response> {
75 for attempt in 0..=MAX_RETRIES {
76 let response = self
77 .client
78 .get(url)
79 .header("DD-API-KEY", &self.api_key)
80 .header("DD-APPLICATION-KEY", &self.app_key)
81 .header("Accept", "application/json")
82 .send()
83 .await
84 .context("Failed to send GET request to Datadog API")?;
85
86 if response.status().as_u16() != 429 || attempt == MAX_RETRIES {
87 return Ok(response);
88 }
89 Self::wait_for_retry(&response, attempt).await;
90 }
91 unreachable!()
92 }
93
94 pub async fn post_json<T: serde::Serialize + Sync + ?Sized>(
96 &self,
97 url: &str,
98 body: &T,
99 ) -> Result<reqwest::Response> {
100 for attempt in 0..=MAX_RETRIES {
101 let response = self
102 .client
103 .post(url)
104 .header("DD-API-KEY", &self.api_key)
105 .header("DD-APPLICATION-KEY", &self.app_key)
106 .header("Content-Type", "application/json")
107 .header("Accept", "application/json")
108 .json(body)
109 .send()
110 .await
111 .context("Failed to send POST request to Datadog API")?;
112
113 if response.status().as_u16() != 429 || attempt == MAX_RETRIES {
114 return Ok(response);
115 }
116 Self::wait_for_retry(&response, attempt).await;
117 }
118 unreachable!()
119 }
120
121 pub async fn response_to_error(response: reqwest::Response) -> DatadogError {
127 let status = response.status().as_u16();
128 let headers = response.headers().clone();
129 let body = response.text().await.unwrap_or_default();
130 let body = if status == 429 {
131 match format_rate_limit(&headers) {
132 Some(suffix) => format!("{body} {suffix}").trim().to_string(),
133 None => body,
134 }
135 } else {
136 body
137 };
138 DatadogError::ApiRequestFailed { status, body }
139 }
140
141 async fn wait_for_retry(response: &reqwest::Response, attempt: u32) {
146 let headers = response.headers();
147 let delay = header_u64(headers, "Retry-After")
148 .or_else(|| header_u64(headers, "X-RateLimit-Reset"))
149 .unwrap_or_else(|| DEFAULT_RETRY_DELAY_SECS.pow(attempt + 1));
150
151 eprintln!(
152 "Rate limited (429). Retrying in {delay}s (attempt {})...",
153 attempt + 1
154 );
155 tokio::time::sleep(Duration::from_secs(delay)).await;
156 }
157}
158
159fn header_u64(headers: &reqwest::header::HeaderMap, name: &str) -> Option<u64> {
160 headers
161 .get(name)
162 .and_then(|v| v.to_str().ok())
163 .and_then(|s| s.parse::<u64>().ok())
164}
165
166fn format_rate_limit(headers: &reqwest::header::HeaderMap) -> Option<String> {
167 let remaining = headers
168 .get("X-RateLimit-Remaining")
169 .and_then(|v| v.to_str().ok());
170 let reset = headers
171 .get("X-RateLimit-Reset")
172 .and_then(|v| v.to_str().ok());
173 let limit = headers
174 .get("X-RateLimit-Limit")
175 .and_then(|v| v.to_str().ok());
176
177 if remaining.is_none() && reset.is_none() && limit.is_none() {
178 return None;
179 }
180
181 let mut parts = Vec::new();
182 if let Some(v) = remaining {
183 parts.push(format!("remaining={v}"));
184 }
185 if let Some(v) = limit {
186 parts.push(format!("limit={v}"));
187 }
188 if let Some(v) = reset {
189 parts.push(format!("reset_in={v}s"));
190 }
191 Some(format!("[rate-limit: {}]", parts.join(", ")))
192}
193
194#[cfg(test)]
195#[allow(clippy::unwrap_used, clippy::expect_used)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn new_client_strips_trailing_slash() {
201 let client = DatadogClient::new("https://api.datadoghq.com/", "api", "app").unwrap();
202 assert_eq!(client.base_url(), "https://api.datadoghq.com");
203 }
204
205 #[test]
206 fn new_client_preserves_clean_url() {
207 let client = DatadogClient::new("https://api.datadoghq.com", "api", "app").unwrap();
208 assert_eq!(client.base_url(), "https://api.datadoghq.com");
209 }
210
211 #[test]
212 fn from_credentials_builds_base_url_from_site() {
213 let _guard = crate::datadog::test_support::EnvGuard::take();
215 std::env::remove_var(crate::datadog::auth::DATADOG_API_URL);
216 let creds = DatadogCredentials {
217 api_key: "api".to_string(),
218 app_key: "app".to_string(),
219 site: "us5.datadoghq.com".to_string(),
220 };
221 let client = DatadogClient::from_credentials(&creds).unwrap();
222 assert_eq!(client.base_url(), "https://api.us5.datadoghq.com");
223 }
224
225 #[test]
226 fn from_credentials_honours_api_url_override() {
227 let _guard = crate::datadog::test_support::EnvGuard::take();
228 std::env::set_var(
229 crate::datadog::auth::DATADOG_API_URL,
230 "http://proxy.example:8080",
231 );
232 let creds = DatadogCredentials {
233 api_key: "api".to_string(),
234 app_key: "app".to_string(),
235 site: "us5.datadoghq.com".to_string(),
236 };
237 let client = DatadogClient::from_credentials(&creds).unwrap();
238 assert_eq!(client.base_url(), "http://proxy.example:8080");
239 }
240
241 #[test]
242 fn from_credentials_ignores_empty_api_url_override() {
243 let _guard = crate::datadog::test_support::EnvGuard::take();
244 std::env::set_var(crate::datadog::auth::DATADOG_API_URL, "");
245 let creds = DatadogCredentials {
246 api_key: "api".to_string(),
247 app_key: "app".to_string(),
248 site: "datadoghq.com".to_string(),
249 };
250 let client = DatadogClient::from_credentials(&creds).unwrap();
251 assert_eq!(client.base_url(), "https://api.datadoghq.com");
252 }
253
254 #[tokio::test]
255 async fn get_json_sends_auth_headers() {
256 let server = wiremock::MockServer::start().await;
257 wiremock::Mock::given(wiremock::matchers::method("GET"))
258 .and(wiremock::matchers::path("/test"))
259 .and(wiremock::matchers::header("DD-API-KEY", "my-api"))
260 .and(wiremock::matchers::header("DD-APPLICATION-KEY", "my-app"))
261 .and(wiremock::matchers::header("Accept", "application/json"))
262 .respond_with(
263 wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({"ok": true})),
264 )
265 .expect(1)
266 .mount(&server)
267 .await;
268
269 let client = DatadogClient::new(&server.uri(), "my-api", "my-app").unwrap();
270 let resp = client
271 .get_json(&format!("{}/test", server.uri()))
272 .await
273 .unwrap();
274 assert!(resp.status().is_success());
275 }
276
277 #[tokio::test]
278 async fn post_json_sends_body_and_auth() {
279 let server = wiremock::MockServer::start().await;
280 wiremock::Mock::given(wiremock::matchers::method("POST"))
281 .and(wiremock::matchers::path("/test"))
282 .and(wiremock::matchers::header("DD-API-KEY", "my-api"))
283 .and(wiremock::matchers::header("DD-APPLICATION-KEY", "my-app"))
284 .and(wiremock::matchers::header(
285 "Content-Type",
286 "application/json",
287 ))
288 .and(wiremock::matchers::body_json(serde_json::json!({
289 "query": "hello"
290 })))
291 .respond_with(
292 wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({"id": "1"})),
293 )
294 .expect(1)
295 .mount(&server)
296 .await;
297
298 let client = DatadogClient::new(&server.uri(), "my-api", "my-app").unwrap();
299 let body = serde_json::json!({"query": "hello"});
300 let resp = client
301 .post_json(&format!("{}/test", server.uri()), &body)
302 .await
303 .unwrap();
304 assert!(resp.status().is_success());
305 }
306
307 #[tokio::test]
308 async fn get_json_retries_on_429_via_retry_after() {
309 let server = wiremock::MockServer::start().await;
310 wiremock::Mock::given(wiremock::matchers::method("GET"))
311 .and(wiremock::matchers::path("/test"))
312 .respond_with(wiremock::ResponseTemplate::new(429).append_header("Retry-After", "0"))
313 .up_to_n_times(1)
314 .mount(&server)
315 .await;
316 wiremock::Mock::given(wiremock::matchers::method("GET"))
317 .and(wiremock::matchers::path("/test"))
318 .respond_with(
319 wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({"ok": true})),
320 )
321 .up_to_n_times(1)
322 .mount(&server)
323 .await;
324
325 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
326 let resp = client
327 .get_json(&format!("{}/test", server.uri()))
328 .await
329 .unwrap();
330 assert!(resp.status().is_success());
331 }
332
333 #[tokio::test]
334 async fn get_json_retries_on_429_via_x_ratelimit_reset() {
335 let server = wiremock::MockServer::start().await;
336 wiremock::Mock::given(wiremock::matchers::method("GET"))
337 .and(wiremock::matchers::path("/test"))
338 .respond_with(
339 wiremock::ResponseTemplate::new(429).append_header("X-RateLimit-Reset", "0"),
340 )
341 .up_to_n_times(1)
342 .mount(&server)
343 .await;
344 wiremock::Mock::given(wiremock::matchers::method("GET"))
345 .and(wiremock::matchers::path("/test"))
346 .respond_with(
347 wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({"ok": true})),
348 )
349 .up_to_n_times(1)
350 .mount(&server)
351 .await;
352
353 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
354 let resp = client
355 .get_json(&format!("{}/test", server.uri()))
356 .await
357 .unwrap();
358 assert!(resp.status().is_success());
359 }
360
361 #[tokio::test]
362 async fn post_json_retries_on_429() {
363 let server = wiremock::MockServer::start().await;
364 wiremock::Mock::given(wiremock::matchers::method("POST"))
365 .and(wiremock::matchers::path("/test"))
366 .respond_with(wiremock::ResponseTemplate::new(429).append_header("Retry-After", "0"))
367 .up_to_n_times(1)
368 .mount(&server)
369 .await;
370 wiremock::Mock::given(wiremock::matchers::method("POST"))
371 .and(wiremock::matchers::path("/test"))
372 .respond_with(wiremock::ResponseTemplate::new(201))
373 .up_to_n_times(1)
374 .mount(&server)
375 .await;
376
377 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
378 let resp = client
379 .post_json(
380 &format!("{}/test", server.uri()),
381 &serde_json::json!({"k": "v"}),
382 )
383 .await
384 .unwrap();
385 assert_eq!(resp.status().as_u16(), 201);
386 }
387
388 #[tokio::test]
389 async fn get_json_returns_429_after_max_retries() {
390 let server = wiremock::MockServer::start().await;
391 wiremock::Mock::given(wiremock::matchers::method("GET"))
392 .and(wiremock::matchers::path("/test"))
393 .respond_with(wiremock::ResponseTemplate::new(429).append_header("Retry-After", "0"))
394 .mount(&server)
395 .await;
396
397 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
398 let resp = client
399 .get_json(&format!("{}/test", server.uri()))
400 .await
401 .unwrap();
402 assert_eq!(resp.status().as_u16(), 429);
403 }
404
405 #[tokio::test]
406 async fn response_to_error_surfaces_rate_limit_headers_on_429() {
407 let server = wiremock::MockServer::start().await;
408 wiremock::Mock::given(wiremock::matchers::method("GET"))
409 .and(wiremock::matchers::path("/test"))
410 .respond_with(
411 wiremock::ResponseTemplate::new(429)
412 .append_header("Retry-After", "0")
413 .append_header("X-RateLimit-Remaining", "0")
414 .append_header("X-RateLimit-Reset", "42")
415 .append_header("X-RateLimit-Limit", "100")
416 .set_body_string("too many"),
417 )
418 .mount(&server)
419 .await;
420
421 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
422 let resp = client
423 .get_json(&format!("{}/test", server.uri()))
424 .await
425 .unwrap();
426 let err = DatadogClient::response_to_error(resp).await;
427 let msg = err.to_string();
428 assert!(msg.contains("429"));
429 assert!(msg.contains("too many"));
430 assert!(msg.contains("remaining=0"));
431 assert!(msg.contains("limit=100"));
432 assert!(msg.contains("reset_in=42s"));
433 }
434
435 #[tokio::test]
436 async fn response_to_error_does_not_add_rate_limit_suffix_on_non_429() {
437 let server = wiremock::MockServer::start().await;
438 wiremock::Mock::given(wiremock::matchers::method("GET"))
439 .and(wiremock::matchers::path("/test"))
440 .respond_with(wiremock::ResponseTemplate::new(401).set_body_string("Unauthorized"))
441 .mount(&server)
442 .await;
443
444 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
445 let resp = client
446 .get_json(&format!("{}/test", server.uri()))
447 .await
448 .unwrap();
449 let err = DatadogClient::response_to_error(resp).await;
450 let msg = err.to_string();
451 assert!(msg.contains("401"));
452 assert!(msg.contains("Unauthorized"));
453 assert!(!msg.contains("rate-limit"));
454 }
455
456 #[tokio::test]
457 async fn response_to_error_omits_suffix_when_no_rate_limit_headers() {
458 let server = wiremock::MockServer::start().await;
459 wiremock::Mock::given(wiremock::matchers::method("GET"))
460 .and(wiremock::matchers::path("/test"))
461 .respond_with(
462 wiremock::ResponseTemplate::new(429)
463 .append_header("Retry-After", "0")
464 .set_body_string("slow down"),
465 )
466 .mount(&server)
467 .await;
468
469 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
470 let resp = client
471 .get_json(&format!("{}/test", server.uri()))
472 .await
473 .unwrap();
474 let err = DatadogClient::response_to_error(resp).await;
475 let msg = err.to_string();
476 assert!(msg.contains("slow down"));
477 assert!(!msg.contains("rate-limit"));
478 }
479}