Skip to main content

omni_dev/datadog/
client.rs

1//! Datadog REST API client.
2//!
3//! Thin `reqwest` wrapper that injects the `DD-API-KEY` and
4//! `DD-APPLICATION-KEY` headers on every request and retries 429 responses
5//! with `Retry-After` / `X-RateLimit-Reset` awareness.
6
7use std::time::Duration;
8
9use anyhow::{Context, Result};
10use reqwest::Client;
11
12use crate::datadog::auth::{base_url_for_site, DatadogCredentials};
13use crate::datadog::error::DatadogError;
14
15/// HTTP request timeout for Datadog API calls.
16const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
17
18/// Maximum number of retries on HTTP 429 (Too Many Requests).
19const MAX_RETRIES: u32 = 3;
20
21/// Default retry delay when no `Retry-After` / `X-RateLimit-Reset` header
22/// is present. Used as the base for exponential backoff.
23const DEFAULT_RETRY_DELAY_SECS: u64 = 2;
24
25/// HTTP client for Datadog REST APIs.
26#[derive(Debug)]
27pub struct DatadogClient {
28    client: Client,
29    base_url: String,
30    api_key: String,
31    app_key: String,
32}
33
34impl DatadogClient {
35    /// Creates a new Datadog API client.
36    ///
37    /// `base_url` should be the full API host, e.g. `https://api.datadoghq.com`.
38    /// For production use, construct via [`Self::from_credentials`]; tests
39    /// pass a wiremock URL directly.
40    pub fn new(base_url: &str, api_key: &str, app_key: &str) -> Result<Self> {
41        let client = Client::builder()
42            .timeout(REQUEST_TIMEOUT)
43            .build()
44            .context("Failed to build HTTP client")?;
45
46        Ok(Self {
47            client,
48            base_url: base_url.trim_end_matches('/').to_string(),
49            api_key: api_key.to_string(),
50            app_key: app_key.to_string(),
51        })
52    }
53
54    /// Creates a client from stored credentials.
55    ///
56    /// Respects `DATADOG_API_URL` as an optional override: when set in the
57    /// process environment it replaces the site-derived base URL. Used for
58    /// tests (wiremock) and on-prem Datadog installs.
59    pub fn from_credentials(creds: &DatadogCredentials) -> Result<Self> {
60        let base_url = std::env::var(crate::datadog::auth::DATADOG_API_URL)
61            .ok()
62            .filter(|s| !s.is_empty())
63            .unwrap_or_else(|| base_url_for_site(&creds.site));
64        Self::new(&base_url, &creds.api_key, &creds.app_key)
65    }
66
67    /// Returns the API base URL (without trailing slash).
68    #[must_use]
69    pub fn base_url(&self) -> &str {
70        &self.base_url
71    }
72
73    /// Sends an authenticated GET request and returns the raw response.
74    pub async fn get_json(&self, url: &str) -> Result<reqwest::Response> {
75        for attempt in 0..=MAX_RETRIES {
76            let response = self
77                .client
78                .get(url)
79                .header("DD-API-KEY", &self.api_key)
80                .header("DD-APPLICATION-KEY", &self.app_key)
81                .header("Accept", "application/json")
82                .send()
83                .await
84                .context("Failed to send GET request to Datadog API")?;
85
86            if response.status().as_u16() != 429 || attempt == MAX_RETRIES {
87                return Ok(response);
88            }
89            Self::wait_for_retry(&response, attempt).await;
90        }
91        unreachable!()
92    }
93
94    /// Sends an authenticated POST request with a JSON body and returns the raw response.
95    pub async fn post_json<T: serde::Serialize + Sync + ?Sized>(
96        &self,
97        url: &str,
98        body: &T,
99    ) -> Result<reqwest::Response> {
100        for attempt in 0..=MAX_RETRIES {
101            let response = self
102                .client
103                .post(url)
104                .header("DD-API-KEY", &self.api_key)
105                .header("DD-APPLICATION-KEY", &self.app_key)
106                .header("Content-Type", "application/json")
107                .header("Accept", "application/json")
108                .json(body)
109                .send()
110                .await
111                .context("Failed to send POST request to Datadog API")?;
112
113            if response.status().as_u16() != 429 || attempt == MAX_RETRIES {
114                return Ok(response);
115            }
116            Self::wait_for_retry(&response, attempt).await;
117        }
118        unreachable!()
119    }
120
121    /// Consumes a non-success response and turns it into a [`DatadogError`].
122    ///
123    /// For 429 responses, appends a human-readable rate-limit summary
124    /// (extracted from `X-RateLimit-*` headers) to the body, so the caller
125    /// sees why the retry loop gave up.
126    pub async fn response_to_error(response: reqwest::Response) -> DatadogError {
127        let status = response.status().as_u16();
128        let headers = response.headers().clone();
129        let body = response.text().await.unwrap_or_default();
130        let body = if status == 429 {
131            match format_rate_limit(&headers) {
132                Some(suffix) => format!("{body} {suffix}").trim().to_string(),
133                None => body,
134            }
135        } else {
136            body
137        };
138        DatadogError::ApiRequestFailed { status, body }
139    }
140
141    /// Waits before retrying a rate-limited request.
142    ///
143    /// Consults, in order: `Retry-After`, then Datadog's `X-RateLimit-Reset`,
144    /// then exponential backoff (`DEFAULT_RETRY_DELAY_SECS ^ (attempt+1)`).
145    async fn wait_for_retry(response: &reqwest::Response, attempt: u32) {
146        let headers = response.headers();
147        let delay = header_u64(headers, "Retry-After")
148            .or_else(|| header_u64(headers, "X-RateLimit-Reset"))
149            .unwrap_or_else(|| DEFAULT_RETRY_DELAY_SECS.pow(attempt + 1));
150
151        eprintln!(
152            "Rate limited (429). Retrying in {delay}s (attempt {})...",
153            attempt + 1
154        );
155        tokio::time::sleep(Duration::from_secs(delay)).await;
156    }
157}
158
159fn header_u64(headers: &reqwest::header::HeaderMap, name: &str) -> Option<u64> {
160    headers
161        .get(name)
162        .and_then(|v| v.to_str().ok())
163        .and_then(|s| s.parse::<u64>().ok())
164}
165
166fn format_rate_limit(headers: &reqwest::header::HeaderMap) -> Option<String> {
167    let remaining = headers
168        .get("X-RateLimit-Remaining")
169        .and_then(|v| v.to_str().ok());
170    let reset = headers
171        .get("X-RateLimit-Reset")
172        .and_then(|v| v.to_str().ok());
173    let limit = headers
174        .get("X-RateLimit-Limit")
175        .and_then(|v| v.to_str().ok());
176
177    if remaining.is_none() && reset.is_none() && limit.is_none() {
178        return None;
179    }
180
181    let mut parts = Vec::new();
182    if let Some(v) = remaining {
183        parts.push(format!("remaining={v}"));
184    }
185    if let Some(v) = limit {
186        parts.push(format!("limit={v}"));
187    }
188    if let Some(v) = reset {
189        parts.push(format!("reset_in={v}s"));
190    }
191    Some(format!("[rate-limit: {}]", parts.join(", ")))
192}
193
194#[cfg(test)]
195#[allow(clippy::unwrap_used, clippy::expect_used)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn new_client_strips_trailing_slash() {
201        let client = DatadogClient::new("https://api.datadoghq.com/", "api", "app").unwrap();
202        assert_eq!(client.base_url(), "https://api.datadoghq.com");
203    }
204
205    #[test]
206    fn new_client_preserves_clean_url() {
207        let client = DatadogClient::new("https://api.datadoghq.com", "api", "app").unwrap();
208        assert_eq!(client.base_url(), "https://api.datadoghq.com");
209    }
210
211    #[test]
212    fn from_credentials_builds_base_url_from_site() {
213        // EnvGuard serialises with other tests that mutate DATADOG_API_URL.
214        let _guard = crate::datadog::test_support::EnvGuard::take();
215        std::env::remove_var(crate::datadog::auth::DATADOG_API_URL);
216        let creds = DatadogCredentials {
217            api_key: "api".to_string(),
218            app_key: "app".to_string(),
219            site: "us5.datadoghq.com".to_string(),
220        };
221        let client = DatadogClient::from_credentials(&creds).unwrap();
222        assert_eq!(client.base_url(), "https://api.us5.datadoghq.com");
223    }
224
225    #[test]
226    fn from_credentials_honours_api_url_override() {
227        let _guard = crate::datadog::test_support::EnvGuard::take();
228        std::env::set_var(
229            crate::datadog::auth::DATADOG_API_URL,
230            "http://proxy.example:8080",
231        );
232        let creds = DatadogCredentials {
233            api_key: "api".to_string(),
234            app_key: "app".to_string(),
235            site: "us5.datadoghq.com".to_string(),
236        };
237        let client = DatadogClient::from_credentials(&creds).unwrap();
238        assert_eq!(client.base_url(), "http://proxy.example:8080");
239    }
240
241    #[test]
242    fn from_credentials_ignores_empty_api_url_override() {
243        let _guard = crate::datadog::test_support::EnvGuard::take();
244        std::env::set_var(crate::datadog::auth::DATADOG_API_URL, "");
245        let creds = DatadogCredentials {
246            api_key: "api".to_string(),
247            app_key: "app".to_string(),
248            site: "datadoghq.com".to_string(),
249        };
250        let client = DatadogClient::from_credentials(&creds).unwrap();
251        assert_eq!(client.base_url(), "https://api.datadoghq.com");
252    }
253
254    #[tokio::test]
255    async fn get_json_sends_auth_headers() {
256        let server = wiremock::MockServer::start().await;
257        wiremock::Mock::given(wiremock::matchers::method("GET"))
258            .and(wiremock::matchers::path("/test"))
259            .and(wiremock::matchers::header("DD-API-KEY", "my-api"))
260            .and(wiremock::matchers::header("DD-APPLICATION-KEY", "my-app"))
261            .and(wiremock::matchers::header("Accept", "application/json"))
262            .respond_with(
263                wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({"ok": true})),
264            )
265            .expect(1)
266            .mount(&server)
267            .await;
268
269        let client = DatadogClient::new(&server.uri(), "my-api", "my-app").unwrap();
270        let resp = client
271            .get_json(&format!("{}/test", server.uri()))
272            .await
273            .unwrap();
274        assert!(resp.status().is_success());
275    }
276
277    #[tokio::test]
278    async fn post_json_sends_body_and_auth() {
279        let server = wiremock::MockServer::start().await;
280        wiremock::Mock::given(wiremock::matchers::method("POST"))
281            .and(wiremock::matchers::path("/test"))
282            .and(wiremock::matchers::header("DD-API-KEY", "my-api"))
283            .and(wiremock::matchers::header("DD-APPLICATION-KEY", "my-app"))
284            .and(wiremock::matchers::header(
285                "Content-Type",
286                "application/json",
287            ))
288            .and(wiremock::matchers::body_json(serde_json::json!({
289                "query": "hello"
290            })))
291            .respond_with(
292                wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({"id": "1"})),
293            )
294            .expect(1)
295            .mount(&server)
296            .await;
297
298        let client = DatadogClient::new(&server.uri(), "my-api", "my-app").unwrap();
299        let body = serde_json::json!({"query": "hello"});
300        let resp = client
301            .post_json(&format!("{}/test", server.uri()), &body)
302            .await
303            .unwrap();
304        assert!(resp.status().is_success());
305    }
306
307    #[tokio::test]
308    async fn get_json_retries_on_429_via_retry_after() {
309        let server = wiremock::MockServer::start().await;
310        wiremock::Mock::given(wiremock::matchers::method("GET"))
311            .and(wiremock::matchers::path("/test"))
312            .respond_with(wiremock::ResponseTemplate::new(429).append_header("Retry-After", "0"))
313            .up_to_n_times(1)
314            .mount(&server)
315            .await;
316        wiremock::Mock::given(wiremock::matchers::method("GET"))
317            .and(wiremock::matchers::path("/test"))
318            .respond_with(
319                wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({"ok": true})),
320            )
321            .up_to_n_times(1)
322            .mount(&server)
323            .await;
324
325        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
326        let resp = client
327            .get_json(&format!("{}/test", server.uri()))
328            .await
329            .unwrap();
330        assert!(resp.status().is_success());
331    }
332
333    #[tokio::test]
334    async fn get_json_retries_on_429_via_x_ratelimit_reset() {
335        let server = wiremock::MockServer::start().await;
336        wiremock::Mock::given(wiremock::matchers::method("GET"))
337            .and(wiremock::matchers::path("/test"))
338            .respond_with(
339                wiremock::ResponseTemplate::new(429).append_header("X-RateLimit-Reset", "0"),
340            )
341            .up_to_n_times(1)
342            .mount(&server)
343            .await;
344        wiremock::Mock::given(wiremock::matchers::method("GET"))
345            .and(wiremock::matchers::path("/test"))
346            .respond_with(
347                wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({"ok": true})),
348            )
349            .up_to_n_times(1)
350            .mount(&server)
351            .await;
352
353        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
354        let resp = client
355            .get_json(&format!("{}/test", server.uri()))
356            .await
357            .unwrap();
358        assert!(resp.status().is_success());
359    }
360
361    #[tokio::test]
362    async fn post_json_retries_on_429() {
363        let server = wiremock::MockServer::start().await;
364        wiremock::Mock::given(wiremock::matchers::method("POST"))
365            .and(wiremock::matchers::path("/test"))
366            .respond_with(wiremock::ResponseTemplate::new(429).append_header("Retry-After", "0"))
367            .up_to_n_times(1)
368            .mount(&server)
369            .await;
370        wiremock::Mock::given(wiremock::matchers::method("POST"))
371            .and(wiremock::matchers::path("/test"))
372            .respond_with(wiremock::ResponseTemplate::new(201))
373            .up_to_n_times(1)
374            .mount(&server)
375            .await;
376
377        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
378        let resp = client
379            .post_json(
380                &format!("{}/test", server.uri()),
381                &serde_json::json!({"k": "v"}),
382            )
383            .await
384            .unwrap();
385        assert_eq!(resp.status().as_u16(), 201);
386    }
387
388    #[tokio::test]
389    async fn get_json_returns_429_after_max_retries() {
390        let server = wiremock::MockServer::start().await;
391        wiremock::Mock::given(wiremock::matchers::method("GET"))
392            .and(wiremock::matchers::path("/test"))
393            .respond_with(wiremock::ResponseTemplate::new(429).append_header("Retry-After", "0"))
394            .mount(&server)
395            .await;
396
397        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
398        let resp = client
399            .get_json(&format!("{}/test", server.uri()))
400            .await
401            .unwrap();
402        assert_eq!(resp.status().as_u16(), 429);
403    }
404
405    #[tokio::test]
406    async fn response_to_error_surfaces_rate_limit_headers_on_429() {
407        let server = wiremock::MockServer::start().await;
408        wiremock::Mock::given(wiremock::matchers::method("GET"))
409            .and(wiremock::matchers::path("/test"))
410            .respond_with(
411                wiremock::ResponseTemplate::new(429)
412                    .append_header("Retry-After", "0")
413                    .append_header("X-RateLimit-Remaining", "0")
414                    .append_header("X-RateLimit-Reset", "42")
415                    .append_header("X-RateLimit-Limit", "100")
416                    .set_body_string("too many"),
417            )
418            .mount(&server)
419            .await;
420
421        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
422        let resp = client
423            .get_json(&format!("{}/test", server.uri()))
424            .await
425            .unwrap();
426        let err = DatadogClient::response_to_error(resp).await;
427        let msg = err.to_string();
428        assert!(msg.contains("429"));
429        assert!(msg.contains("too many"));
430        assert!(msg.contains("remaining=0"));
431        assert!(msg.contains("limit=100"));
432        assert!(msg.contains("reset_in=42s"));
433    }
434
435    #[tokio::test]
436    async fn response_to_error_does_not_add_rate_limit_suffix_on_non_429() {
437        let server = wiremock::MockServer::start().await;
438        wiremock::Mock::given(wiremock::matchers::method("GET"))
439            .and(wiremock::matchers::path("/test"))
440            .respond_with(wiremock::ResponseTemplate::new(401).set_body_string("Unauthorized"))
441            .mount(&server)
442            .await;
443
444        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
445        let resp = client
446            .get_json(&format!("{}/test", server.uri()))
447            .await
448            .unwrap();
449        let err = DatadogClient::response_to_error(resp).await;
450        let msg = err.to_string();
451        assert!(msg.contains("401"));
452        assert!(msg.contains("Unauthorized"));
453        assert!(!msg.contains("rate-limit"));
454    }
455
456    #[tokio::test]
457    async fn response_to_error_omits_suffix_when_no_rate_limit_headers() {
458        let server = wiremock::MockServer::start().await;
459        wiremock::Mock::given(wiremock::matchers::method("GET"))
460            .and(wiremock::matchers::path("/test"))
461            .respond_with(
462                wiremock::ResponseTemplate::new(429)
463                    .append_header("Retry-After", "0")
464                    .set_body_string("slow down"),
465            )
466            .mount(&server)
467            .await;
468
469        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
470        let resp = client
471            .get_json(&format!("{}/test", server.uri()))
472            .await
473            .unwrap();
474        let err = DatadogClient::response_to_error(resp).await;
475        let msg = err.to_string();
476        assert!(msg.contains("slow down"));
477        assert!(!msg.contains("rate-limit"));
478    }
479}