1use crate::rate_limit::{RateLimitConfig, RateLimiter};
2use crate::{config::DatadogConfig, error::Error, Result};
3use reqwest::{header, Client, Response};
4use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, RequestBuilder};
5use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
6use serde::de::DeserializeOwned;
7use std::time::Duration;
8use tracing::{debug, error, trace};
9
10fn sanitize_log_message(message: &str) -> String {
11 use regex::Regex;
12
13 let key_pattern = r#"dd-api-key|dd-application-key|DD_API_KEY|DD_APP_KEY|api_key|app_key|apikey|appkey"#;
14
15 let patterns = [
16 format!(r#"(?i)"({key_pattern})"\s*:\s*"([^"]*)""#),
18 format!(r#"(?i)({key_pattern})\s*[:=]\s*"([^"]*)""#),
20 format!(r#"(?i)({key_pattern})\s*[:=]\s*'([^']*)'"#),
22 format!(r#"(?i)({key_pattern})\s*[:=]\s*([^\s,}}"'\n]+)"#),
24 ];
25
26 let mut result = message.to_string();
27 for pattern in patterns {
28 if let Ok(re) = Regex::new(&pattern) {
29 result = re.replace_all(&result, "\"$1\": \"[REDACTED]\"").to_string();
30 }
31 }
32 result
33}
34
35#[derive(Clone)]
40pub struct DatadogClient {
41 client: ClientWithMiddleware,
42 config: DatadogConfig,
43 rate_limiter: RateLimiter,
44}
45
46impl DatadogClient {
47 pub fn new(config: DatadogConfig) -> Result<Self> {
53 Self::with_rate_limit(config, RateLimitConfig::default())
54 }
55
56 pub fn with_rate_limit(config: DatadogConfig, rate_limit_config: RateLimitConfig) -> Result<Self> {
62 let retry_policy = ExponentialBackoff::builder()
63 .retry_bounds(
64 Duration::from_millis(config.retry_config.initial_backoff_ms),
65 Duration::from_millis(config.retry_config.max_backoff_ms),
66 )
67 .build_with_max_retries(config.retry_config.max_retries);
68
69 let retry_middleware = RetryTransientMiddleware::new_with_policy(retry_policy);
70
71 let http_config = &config.http_config;
72 let mut builder = Client::builder()
73 .timeout(Duration::from_secs(http_config.timeout_secs))
74 .pool_max_idle_per_host(http_config.pool_max_idle_per_host)
75 .pool_idle_timeout(Duration::from_secs(http_config.pool_idle_timeout_secs))
76 .gzip(true);
77
78 if let Some(keepalive_secs) = http_config.tcp_keepalive_secs {
79 builder = builder.tcp_keepalive(Duration::from_secs(keepalive_secs));
80 }
81
82 let base_client = builder.build().map_err(Error::HttpError)?;
83
84 let client = ClientBuilder::new(base_client)
85 .with(retry_middleware)
86 .build();
87
88 let rate_limiter = RateLimiter::new(rate_limit_config);
89
90 Ok(Self {
91 client,
92 config,
93 rate_limiter,
94 })
95 }
96
97 #[must_use]
99 pub fn config(&self) -> &DatadogConfig {
100 &self.config
101 }
102
103 fn is_unstable_operation(&self, endpoint: &str) -> bool {
105 self.config
106 .unstable_operations
107 .iter()
108 .any(|op| endpoint.contains(op))
109 }
110
111 fn build_headers(&self, endpoint: Option<&str>) -> Result<header::HeaderMap> {
112 let mut headers = header::HeaderMap::new();
113
114 headers.insert(
115 header::HeaderName::from_static("dd-api-key"),
116 header::HeaderValue::from_str(self.config.api_key.expose())
117 .map_err(|e| Error::ConfigError(format!("Invalid API key: {e}")))?,
118 );
119
120 headers.insert(
121 header::HeaderName::from_static("dd-application-key"),
122 header::HeaderValue::from_str(self.config.app_key.expose())
123 .map_err(|e| Error::ConfigError(format!("Invalid app key: {e}")))?,
124 );
125
126 headers.insert(
127 header::CONTENT_TYPE,
128 header::HeaderValue::from_static("application/json"),
129 );
130
131 headers.insert(
132 header::USER_AGENT,
133 header::HeaderValue::from_static("datadog-mcp/0.1.0"),
134 );
135
136 headers.insert(
137 header::ACCEPT_ENCODING,
138 header::HeaderValue::from_static("gzip"),
139 );
140
141 if let Some(endpoint) = endpoint {
143 if self.is_unstable_operation(endpoint) {
144 headers.insert(
145 header::HeaderName::from_static("dd-operation-unstable"),
146 header::HeaderValue::from_static("true"),
147 );
148 }
149 }
150
151 Ok(headers)
152 }
153
154 fn add_auth_headers(&self, builder: RequestBuilder, endpoint: &str) -> Result<RequestBuilder> {
155 Ok(builder.headers(self.build_headers(Some(endpoint))?))
156 }
157
158 async fn handle_response<T: DeserializeOwned>(&self, response: Response) -> Result<T> {
159 let status = response.status();
160
161 if status.is_success() {
162 trace!("Successful response with status: {status}");
163 response.json::<T>().await.map_err(Error::HttpError)
164 } else {
165 let status_code = status.as_u16();
166 let error_body = response.text().await.unwrap_or_else(|e| {
167 debug!("Failed to read error response body: {e}");
168 format!("(failed to read error body: {e})")
169 });
170
171 let sanitized_body = sanitize_log_message(&error_body);
172 error!("API error: {status_code} - {sanitized_body}");
173
174 Err(Error::ApiError {
175 status: status_code,
176 message: sanitized_body,
177 })
178 }
179 }
180
181 pub async fn get<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
182 self.rate_limiter.acquire().await;
183
184 let url = format!("{}{}", self.config.base_url(), endpoint);
185 debug!("GET {url}");
186
187 let request = self.client.get(&url);
188 let request = self.add_auth_headers(request, endpoint)?;
189
190 let response = request.send().await.map_err(Error::MiddlewareError)?;
191
192 self.handle_response(response).await
193 }
194
195 pub async fn get_with_query<T: DeserializeOwned, Q: serde::Serialize>(
196 &self,
197 endpoint: &str,
198 query: &Q,
199 ) -> Result<T> {
200 self.rate_limiter.acquire().await;
201
202 let url = format!("{}{}", self.config.base_url(), endpoint);
203
204 let request = self.client.get(&url).query(query);
205 let request = self.add_auth_headers(request, endpoint)?;
206
207 let response = request.send().await.map_err(Error::MiddlewareError)?;
208
209 debug!("Response status: {}", response.status());
210 self.handle_response(response).await
211 }
212
213 pub async fn post<T: DeserializeOwned, B: serde::Serialize>(
214 &self,
215 endpoint: &str,
216 body: &B,
217 ) -> Result<T> {
218 self.rate_limiter.acquire().await;
219
220 let url = format!("{}{}", self.config.base_url(), endpoint);
221 debug!("POST {url}");
222
223 let json_body = serde_json::to_string(body).map_err(Error::JsonError)?;
224 let request = self
225 .client
226 .post(&url)
227 .body(json_body)
228 .header(header::CONTENT_TYPE, "application/json");
229 let request = self.add_auth_headers(request, endpoint)?;
230
231 let response = request.send().await.map_err(Error::MiddlewareError)?;
232 self.handle_response(response).await
233 }
234
235 pub async fn put<T: DeserializeOwned, B: serde::Serialize>(
236 &self,
237 endpoint: &str,
238 body: &B,
239 ) -> Result<T> {
240 self.rate_limiter.acquire().await;
241
242 let url = format!("{}{}", self.config.base_url(), endpoint);
243 debug!("PUT {url}");
244
245 let json_body = serde_json::to_string(body).map_err(Error::JsonError)?;
246 let request = self
247 .client
248 .put(&url)
249 .body(json_body)
250 .header(header::CONTENT_TYPE, "application/json");
251 let request = self.add_auth_headers(request, endpoint)?;
252
253 let response = request.send().await.map_err(Error::MiddlewareError)?;
254 self.handle_response(response).await
255 }
256
257 pub async fn delete(&self, endpoint: &str) -> Result<()> {
258 self.rate_limiter.acquire().await;
259
260 let url = format!("{}{}", self.config.base_url(), endpoint);
261 debug!("DELETE {url}");
262
263 let request = self.client.delete(&url);
264 let request = self.add_auth_headers(request, endpoint)?;
265
266 let response = request.send().await.map_err(Error::MiddlewareError)?;
267
268 let status = response.status();
269 if status.is_success() {
270 Ok(())
271 } else {
272 let status_code = status.as_u16();
273 let error_body = response.text().await.unwrap_or_else(|e| {
274 debug!("Failed to read error response body: {e}");
275 format!("(failed to read error body: {e})")
276 });
277
278 let sanitized_body = sanitize_log_message(&error_body);
279 error!("API error: {} - {}", status_code, sanitized_body);
280
281 Err(Error::ApiError {
282 status: status_code,
283 message: sanitized_body,
284 })
285 }
286 }
287
288 pub async fn delete_with_response<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
289 self.rate_limiter.acquire().await;
290
291 let url = format!("{}{}", self.config.base_url(), endpoint);
292 debug!("DELETE {url}");
293
294 let request = self.client.delete(&url);
295 let request = self.add_auth_headers(request, endpoint)?;
296
297 let response = request.send().await.map_err(Error::MiddlewareError)?;
298 self.handle_response(response).await
299 }
300
301 #[must_use]
303 pub fn rate_limiter(&self) -> &RateLimiter {
304 &self.rate_limiter
305 }
306
307 pub async fn get_cached<T: DeserializeOwned>(
339 &self,
340 endpoint: &str,
341 cache_info: Option<&CacheInfo>,
342 ) -> Result<Option<CachedResponse<T>>> {
343 self.rate_limiter.acquire().await;
344
345 let url = format!("{}{}", self.config.base_url(), endpoint);
346 debug!("GET (cached) {url}");
347
348 let mut request = self.client.get(&url);
349
350 if let Some(info) = cache_info {
352 if let Some(etag) = &info.etag {
353 request = request.header(header::IF_NONE_MATCH, etag.as_str());
354 }
355 if let Some(last_modified) = &info.last_modified {
356 request = request.header(header::IF_MODIFIED_SINCE, last_modified.as_str());
357 }
358 }
359
360 let request = self.add_auth_headers(request, endpoint)?;
361 let response = request.send().await.map_err(Error::MiddlewareError)?;
362
363 if response.status() == reqwest::StatusCode::NOT_MODIFIED {
365 debug!("304 Not Modified - using cached data");
366 return Ok(None);
367 }
368
369 let new_cache_info = CacheInfo {
371 etag: response
372 .headers()
373 .get(header::ETAG)
374 .and_then(|v| v.to_str().ok())
375 .map(String::from),
376 last_modified: response
377 .headers()
378 .get(header::LAST_MODIFIED)
379 .and_then(|v| v.to_str().ok())
380 .map(String::from),
381 };
382
383 let data: T = self.handle_response(response).await?;
384
385 Ok(Some(CachedResponse {
386 data,
387 cache_info: new_cache_info,
388 }))
389 }
390}
391
392#[derive(Debug, Clone, Default)]
394pub struct CacheInfo {
395 pub etag: Option<String>,
397 pub last_modified: Option<String>,
399}
400
401impl CacheInfo {
402 #[must_use]
404 pub fn has_validators(&self) -> bool {
405 self.etag.is_some() || self.last_modified.is_some()
406 }
407}
408
409#[derive(Debug, Clone)]
411pub struct CachedResponse<T> {
412 pub data: T,
414 pub cache_info: CacheInfo,
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
423 fn test_sanitize_json_api_key() {
424 let input = r#"{"error": "Invalid api_key: abc123secret"}"#;
425 let output = sanitize_log_message(input);
426 assert!(!output.contains("abc123secret"));
427 assert!(output.contains("[REDACTED]"));
428 }
429
430 #[test]
431 fn test_sanitize_header_style() {
432 let input = "dd-api-key: secret123abc";
433 let output = sanitize_log_message(input);
434 assert!(!output.contains("secret123abc"));
435 assert!(output.contains("[REDACTED]"));
436 }
437
438 #[test]
439 fn test_sanitize_env_var_style() {
440 let input = "DD_API_KEY=mysecretkey and DD_APP_KEY=anothersecret";
441 let output = sanitize_log_message(input);
442 assert!(!output.contains("mysecretkey"));
443 assert!(!output.contains("anothersecret"));
444 }
445
446 #[test]
447 fn test_sanitize_quoted_value() {
448 let input = r#"{"api_key": "secret_value_here", "other": "data"}"#;
449 let output = sanitize_log_message(input);
450 assert!(!output.contains("secret_value_here"));
451 assert!(output.contains("other"));
452 }
453
454 #[test]
455 fn test_sanitize_no_secrets() {
456 let input = "This is a normal error message without any secrets";
457 let output = sanitize_log_message(input);
458 assert_eq!(input, output);
459 }
460
461 #[test]
462 fn test_sanitize_case_insensitive() {
463 let input = "API_KEY=secret123";
464 let output = sanitize_log_message(input);
465 assert!(!output.contains("secret123"));
466 }
467
468 #[test]
469 fn test_cache_info_default() {
470 let info = CacheInfo::default();
471 assert!(info.etag.is_none());
472 assert!(info.last_modified.is_none());
473 assert!(!info.has_validators());
474 }
475
476 #[test]
477 fn test_cache_info_with_etag() {
478 let info = CacheInfo {
479 etag: Some("\"abc123\"".to_string()),
480 last_modified: None,
481 };
482 assert!(info.has_validators());
483 }
484
485 #[test]
486 fn test_cache_info_with_last_modified() {
487 let info = CacheInfo {
488 etag: None,
489 last_modified: Some("Wed, 21 Oct 2025 07:28:00 GMT".to_string()),
490 };
491 assert!(info.has_validators());
492 }
493
494 #[test]
495 fn test_cached_response() {
496 let response = CachedResponse {
497 data: vec![1, 2, 3],
498 cache_info: CacheInfo {
499 etag: Some("\"test-etag\"".to_string()),
500 last_modified: Some("Wed, 21 Oct 2025 07:28:00 GMT".to_string()),
501 },
502 };
503 assert_eq!(response.data, vec![1, 2, 3]);
504 assert!(response.cache_info.has_validators());
505 }
506}