datadog_api/
client.rs

1use crate::rate_limit::{RateLimitConfig, RateLimiter};
2use crate::{config::DatadogConfig, error::Error, Result};
3use reqwest::{header, Client, Response};
4use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, RequestBuilder};
5use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
6use serde::de::DeserializeOwned;
7use std::time::Duration;
8use tracing::{debug, error, trace};
9
10fn sanitize_log_message(message: &str) -> String {
11    use regex::Regex;
12
13    let key_pattern = r#"dd-api-key|dd-application-key|DD_API_KEY|DD_APP_KEY|api_key|app_key|apikey|appkey"#;
14
15    let patterns = [
16        // JSON style: "api_key": "value" or "api_key":"value"
17        format!(r#"(?i)"({key_pattern})"\s*:\s*"([^"]*)""#),
18        // Header/env style with quoted value: api_key: "value" or api_key="value"
19        format!(r#"(?i)({key_pattern})\s*[:=]\s*"([^"]*)""#),
20        // Header/env style with single-quoted value
21        format!(r#"(?i)({key_pattern})\s*[:=]\s*'([^']*)'"#),
22        // Header/env style with unquoted value
23        format!(r#"(?i)({key_pattern})\s*[:=]\s*([^\s,}}"'\n]+)"#),
24    ];
25
26    let mut result = message.to_string();
27    for pattern in patterns {
28        if let Ok(re) = Regex::new(&pattern) {
29            result = re.replace_all(&result, "\"$1\": \"[REDACTED]\"").to_string();
30        }
31    }
32    result
33}
34
35/// HTTP client for interacting with the Datadog API.
36///
37/// Handles authentication, request building, and response parsing for all Datadog API endpoints.
38/// Includes client-side rate limiting to prevent hitting Datadog's API limits.
39#[derive(Clone)]
40pub struct DatadogClient {
41    client: ClientWithMiddleware,
42    config: DatadogConfig,
43    rate_limiter: RateLimiter,
44}
45
46impl DatadogClient {
47    /// Creates a new Datadog API client with the given configuration.
48    ///
49    /// # Errors
50    ///
51    /// Returns an error if the HTTP client cannot be built.
52    pub fn new(config: DatadogConfig) -> Result<Self> {
53        Self::with_rate_limit(config, RateLimitConfig::default())
54    }
55
56    /// Creates a new Datadog API client with custom rate limiting configuration.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the HTTP client cannot be built.
61    pub fn with_rate_limit(config: DatadogConfig, rate_limit_config: RateLimitConfig) -> Result<Self> {
62        let retry_policy = ExponentialBackoff::builder()
63            .retry_bounds(
64                Duration::from_millis(config.retry_config.initial_backoff_ms),
65                Duration::from_millis(config.retry_config.max_backoff_ms),
66            )
67            .build_with_max_retries(config.retry_config.max_retries);
68
69        let retry_middleware = RetryTransientMiddleware::new_with_policy(retry_policy);
70
71        let http_config = &config.http_config;
72        let mut builder = Client::builder()
73            .timeout(Duration::from_secs(http_config.timeout_secs))
74            .pool_max_idle_per_host(http_config.pool_max_idle_per_host)
75            .pool_idle_timeout(Duration::from_secs(http_config.pool_idle_timeout_secs))
76            .gzip(true);
77
78        if let Some(keepalive_secs) = http_config.tcp_keepalive_secs {
79            builder = builder.tcp_keepalive(Duration::from_secs(keepalive_secs));
80        }
81
82        let base_client = builder.build().map_err(Error::HttpError)?;
83
84        let client = ClientBuilder::new(base_client)
85            .with(retry_middleware)
86            .build();
87
88        let rate_limiter = RateLimiter::new(rate_limit_config);
89
90        Ok(Self {
91            client,
92            config,
93            rate_limiter,
94        })
95    }
96
97    /// Returns a reference to the configuration used by this client.
98    #[must_use]
99    pub fn config(&self) -> &DatadogConfig {
100        &self.config
101    }
102
103    /// Checks if an endpoint corresponds to an unstable operation.
104    fn is_unstable_operation(&self, endpoint: &str) -> bool {
105        self.config
106            .unstable_operations
107            .iter()
108            .any(|op| endpoint.contains(op))
109    }
110
111    fn build_headers(&self, endpoint: Option<&str>) -> Result<header::HeaderMap> {
112        let mut headers = header::HeaderMap::new();
113
114        headers.insert(
115            header::HeaderName::from_static("dd-api-key"),
116            header::HeaderValue::from_str(self.config.api_key.expose())
117                .map_err(|e| Error::ConfigError(format!("Invalid API key: {e}")))?,
118        );
119
120        headers.insert(
121            header::HeaderName::from_static("dd-application-key"),
122            header::HeaderValue::from_str(self.config.app_key.expose())
123                .map_err(|e| Error::ConfigError(format!("Invalid app key: {e}")))?,
124        );
125
126        headers.insert(
127            header::CONTENT_TYPE,
128            header::HeaderValue::from_static("application/json"),
129        );
130
131        headers.insert(
132            header::USER_AGENT,
133            header::HeaderValue::from_static("datadog-mcp/0.1.0"),
134        );
135
136        headers.insert(
137            header::ACCEPT_ENCODING,
138            header::HeaderValue::from_static("gzip"),
139        );
140
141        // Add unstable operation header if needed
142        if let Some(endpoint) = endpoint {
143            if self.is_unstable_operation(endpoint) {
144                headers.insert(
145                    header::HeaderName::from_static("dd-operation-unstable"),
146                    header::HeaderValue::from_static("true"),
147                );
148            }
149        }
150
151        Ok(headers)
152    }
153
154    fn add_auth_headers(&self, builder: RequestBuilder, endpoint: &str) -> Result<RequestBuilder> {
155        Ok(builder.headers(self.build_headers(Some(endpoint))?))
156    }
157
158    async fn handle_response<T: DeserializeOwned>(&self, response: Response) -> Result<T> {
159        let status = response.status();
160
161        if status.is_success() {
162            trace!("Successful response with status: {status}");
163            response.json::<T>().await.map_err(Error::HttpError)
164        } else {
165            let status_code = status.as_u16();
166            let error_body = response.text().await.unwrap_or_else(|e| {
167                debug!("Failed to read error response body: {e}");
168                format!("(failed to read error body: {e})")
169            });
170
171            let sanitized_body = sanitize_log_message(&error_body);
172            error!("API error: {status_code} - {sanitized_body}");
173
174            Err(Error::ApiError {
175                status: status_code,
176                message: sanitized_body,
177            })
178        }
179    }
180
181    pub async fn get<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
182        self.rate_limiter.acquire().await;
183
184        let url = format!("{}{}", self.config.base_url(), endpoint);
185        debug!("GET {url}");
186
187        let request = self.client.get(&url);
188        let request = self.add_auth_headers(request, endpoint)?;
189
190        let response = request.send().await.map_err(Error::MiddlewareError)?;
191
192        self.handle_response(response).await
193    }
194
195    pub async fn get_with_query<T: DeserializeOwned, Q: serde::Serialize>(
196        &self,
197        endpoint: &str,
198        query: &Q,
199    ) -> Result<T> {
200        self.rate_limiter.acquire().await;
201
202        let url = format!("{}{}", self.config.base_url(), endpoint);
203
204        let request = self.client.get(&url).query(query);
205        let request = self.add_auth_headers(request, endpoint)?;
206
207        let response = request.send().await.map_err(Error::MiddlewareError)?;
208
209        debug!("Response status: {}", response.status());
210        self.handle_response(response).await
211    }
212
213    pub async fn post<T: DeserializeOwned, B: serde::Serialize>(
214        &self,
215        endpoint: &str,
216        body: &B,
217    ) -> Result<T> {
218        self.rate_limiter.acquire().await;
219
220        let url = format!("{}{}", self.config.base_url(), endpoint);
221        debug!("POST {url}");
222
223        let json_body = serde_json::to_string(body).map_err(Error::JsonError)?;
224        let request = self
225            .client
226            .post(&url)
227            .body(json_body)
228            .header(header::CONTENT_TYPE, "application/json");
229        let request = self.add_auth_headers(request, endpoint)?;
230
231        let response = request.send().await.map_err(Error::MiddlewareError)?;
232        self.handle_response(response).await
233    }
234
235    pub async fn put<T: DeserializeOwned, B: serde::Serialize>(
236        &self,
237        endpoint: &str,
238        body: &B,
239    ) -> Result<T> {
240        self.rate_limiter.acquire().await;
241
242        let url = format!("{}{}", self.config.base_url(), endpoint);
243        debug!("PUT {url}");
244
245        let json_body = serde_json::to_string(body).map_err(Error::JsonError)?;
246        let request = self
247            .client
248            .put(&url)
249            .body(json_body)
250            .header(header::CONTENT_TYPE, "application/json");
251        let request = self.add_auth_headers(request, endpoint)?;
252
253        let response = request.send().await.map_err(Error::MiddlewareError)?;
254        self.handle_response(response).await
255    }
256
257    pub async fn delete(&self, endpoint: &str) -> Result<()> {
258        self.rate_limiter.acquire().await;
259
260        let url = format!("{}{}", self.config.base_url(), endpoint);
261        debug!("DELETE {url}");
262
263        let request = self.client.delete(&url);
264        let request = self.add_auth_headers(request, endpoint)?;
265
266        let response = request.send().await.map_err(Error::MiddlewareError)?;
267
268        let status = response.status();
269        if status.is_success() {
270            Ok(())
271        } else {
272            let status_code = status.as_u16();
273            let error_body = response.text().await.unwrap_or_else(|e| {
274                debug!("Failed to read error response body: {e}");
275                format!("(failed to read error body: {e})")
276            });
277
278            let sanitized_body = sanitize_log_message(&error_body);
279            error!("API error: {} - {}", status_code, sanitized_body);
280
281            Err(Error::ApiError {
282                status: status_code,
283                message: sanitized_body,
284            })
285        }
286    }
287
288    pub async fn delete_with_response<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
289        self.rate_limiter.acquire().await;
290
291        let url = format!("{}{}", self.config.base_url(), endpoint);
292        debug!("DELETE {url}");
293
294        let request = self.client.delete(&url);
295        let request = self.add_auth_headers(request, endpoint)?;
296
297        let response = request.send().await.map_err(Error::MiddlewareError)?;
298        self.handle_response(response).await
299    }
300
301    /// Returns a reference to the rate limiter (for monitoring)
302    #[must_use]
303    pub fn rate_limiter(&self) -> &RateLimiter {
304        &self.rate_limiter
305    }
306
307    /// GET request with conditional caching support (ETag/Last-Modified).
308    ///
309    /// Returns `Ok(Some(response))` if data was modified, or `Ok(None)` if
310    /// the cached version is still valid (304 Not Modified).
311    ///
312    /// # Arguments
313    ///
314    /// * `endpoint` - API endpoint
315    /// * `cache_info` - Optional cache info from a previous response
316    ///
317    /// # Example
318    ///
319    /// ```no_run
320    /// # use datadog_api::{DatadogClient, DatadogConfig, CachedResponse};
321    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
322    /// let client = DatadogClient::new(DatadogConfig::from_env()?)?;
323    ///
324    /// // First request - no cache
325    /// let response: CachedResponse<serde_json::Value> = client
326    ///     .get_cached("/api/v1/monitor", None)
327    ///     .await?
328    ///     .expect("First request should return data");
329    ///
330    /// // Subsequent request with cache info
331    /// match client.get_cached::<serde_json::Value>("/api/v1/monitor", Some(&response.cache_info)).await? {
332    ///     Some(new_response) => println!("Data was modified"),
333    ///     None => println!("Data unchanged, use cached version"),
334    /// }
335    /// # Ok(())
336    /// # }
337    /// ```
338    pub async fn get_cached<T: DeserializeOwned>(
339        &self,
340        endpoint: &str,
341        cache_info: Option<&CacheInfo>,
342    ) -> Result<Option<CachedResponse<T>>> {
343        self.rate_limiter.acquire().await;
344
345        let url = format!("{}{}", self.config.base_url(), endpoint);
346        debug!("GET (cached) {url}");
347
348        let mut request = self.client.get(&url);
349
350        // Add conditional headers if we have cache info
351        if let Some(info) = cache_info {
352            if let Some(etag) = &info.etag {
353                request = request.header(header::IF_NONE_MATCH, etag.as_str());
354            }
355            if let Some(last_modified) = &info.last_modified {
356                request = request.header(header::IF_MODIFIED_SINCE, last_modified.as_str());
357            }
358        }
359
360        let request = self.add_auth_headers(request, endpoint)?;
361        let response = request.send().await.map_err(Error::MiddlewareError)?;
362
363        // 304 Not Modified - cached data is still valid
364        if response.status() == reqwest::StatusCode::NOT_MODIFIED {
365            debug!("304 Not Modified - using cached data");
366            return Ok(None);
367        }
368
369        // Extract cache headers before consuming the response
370        let new_cache_info = CacheInfo {
371            etag: response
372                .headers()
373                .get(header::ETAG)
374                .and_then(|v| v.to_str().ok())
375                .map(String::from),
376            last_modified: response
377                .headers()
378                .get(header::LAST_MODIFIED)
379                .and_then(|v| v.to_str().ok())
380                .map(String::from),
381        };
382
383        let data: T = self.handle_response(response).await?;
384
385        Ok(Some(CachedResponse {
386            data,
387            cache_info: new_cache_info,
388        }))
389    }
390}
391
392/// Cache validation information from HTTP headers
393#[derive(Debug, Clone, Default)]
394pub struct CacheInfo {
395    /// ETag header value for conditional requests
396    pub etag: Option<String>,
397    /// Last-Modified header value for conditional requests
398    pub last_modified: Option<String>,
399}
400
401impl CacheInfo {
402    /// Check if any cache validation info is available
403    #[must_use]
404    pub fn has_validators(&self) -> bool {
405        self.etag.is_some() || self.last_modified.is_some()
406    }
407}
408
409/// Response with cache validation information
410#[derive(Debug, Clone)]
411pub struct CachedResponse<T> {
412    /// The response data
413    pub data: T,
414    /// Cache information for subsequent conditional requests
415    pub cache_info: CacheInfo,
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    #[test]
423    fn test_sanitize_json_api_key() {
424        let input = r#"{"error": "Invalid api_key: abc123secret"}"#;
425        let output = sanitize_log_message(input);
426        assert!(!output.contains("abc123secret"));
427        assert!(output.contains("[REDACTED]"));
428    }
429
430    #[test]
431    fn test_sanitize_header_style() {
432        let input = "dd-api-key: secret123abc";
433        let output = sanitize_log_message(input);
434        assert!(!output.contains("secret123abc"));
435        assert!(output.contains("[REDACTED]"));
436    }
437
438    #[test]
439    fn test_sanitize_env_var_style() {
440        let input = "DD_API_KEY=mysecretkey and DD_APP_KEY=anothersecret";
441        let output = sanitize_log_message(input);
442        assert!(!output.contains("mysecretkey"));
443        assert!(!output.contains("anothersecret"));
444    }
445
446    #[test]
447    fn test_sanitize_quoted_value() {
448        let input = r#"{"api_key": "secret_value_here", "other": "data"}"#;
449        let output = sanitize_log_message(input);
450        assert!(!output.contains("secret_value_here"));
451        assert!(output.contains("other"));
452    }
453
454    #[test]
455    fn test_sanitize_no_secrets() {
456        let input = "This is a normal error message without any secrets";
457        let output = sanitize_log_message(input);
458        assert_eq!(input, output);
459    }
460
461    #[test]
462    fn test_sanitize_case_insensitive() {
463        let input = "API_KEY=secret123";
464        let output = sanitize_log_message(input);
465        assert!(!output.contains("secret123"));
466    }
467
468    #[test]
469    fn test_cache_info_default() {
470        let info = CacheInfo::default();
471        assert!(info.etag.is_none());
472        assert!(info.last_modified.is_none());
473        assert!(!info.has_validators());
474    }
475
476    #[test]
477    fn test_cache_info_with_etag() {
478        let info = CacheInfo {
479            etag: Some("\"abc123\"".to_string()),
480            last_modified: None,
481        };
482        assert!(info.has_validators());
483    }
484
485    #[test]
486    fn test_cache_info_with_last_modified() {
487        let info = CacheInfo {
488            etag: None,
489            last_modified: Some("Wed, 21 Oct 2025 07:28:00 GMT".to_string()),
490        };
491        assert!(info.has_validators());
492    }
493
494    #[test]
495    fn test_cached_response() {
496        let response = CachedResponse {
497            data: vec![1, 2, 3],
498            cache_info: CacheInfo {
499                etag: Some("\"test-etag\"".to_string()),
500                last_modified: Some("Wed, 21 Oct 2025 07:28:00 GMT".to_string()),
501            },
502        };
503        assert_eq!(response.data, vec![1, 2, 3]);
504        assert!(response.cache_info.has_validators());
505    }
506}