1#![cfg_attr(not(test), deny(clippy::panic))]
4#![cfg_attr(not(test), deny(clippy::unwrap_used))]
5#![cfg_attr(not(test), deny(clippy::expect_used))]
6#![cfg_attr(not(test), deny(clippy::todo))]
7#![cfg_attr(not(test), deny(clippy::unimplemented))]
8
9use anyhow::Context;
10use hyper::http::uri;
11use serde::de::Error;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use std::sync::{Mutex, MutexGuard};
14use std::{borrow::Cow, ops::Deref, path::PathBuf, str::FromStr};
15
16pub mod azure_app_services;
17pub mod cc_utils;
18pub mod connector;
19#[cfg(feature = "reqwest")]
20pub mod dump_server;
21pub mod entity_id;
22#[macro_use]
23pub mod cstr;
24pub mod config;
25pub mod error;
26pub mod http_common;
27pub mod rate_limiter;
28pub mod tag;
29#[cfg(any(test, feature = "test-utils"))]
30pub mod test_utils;
31pub mod threading;
32pub mod timeout;
33pub mod unix_utils;
34pub mod worker;
35
36pub trait MutexExt<T> {
73 fn lock_or_panic(&self) -> MutexGuard<'_, T>;
74}
75
76impl<T> MutexExt<T> for Mutex<T> {
77 #[inline(always)]
78 #[track_caller]
79 fn lock_or_panic(&self) -> MutexGuard<'_, T> {
80 #[allow(clippy::unwrap_used)]
81 self.lock().unwrap()
82 }
83}
84
85pub mod header {
86 #![allow(clippy::declare_interior_mutable_const)]
87 use hyper::{header::HeaderName, http::HeaderValue};
88
89 pub const DATADOG_SEND_REAL_HTTP_STATUS_STR: &str = "datadog-send-real-http-status";
92 pub const DATADOG_TRACE_COUNT_STR: &str = "x-datadog-trace-count";
93 pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack";
94 pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf";
95
96 pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id");
97 pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id");
98 pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env");
99 pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count");
100 pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName =
104 HeaderName::from_static(DATADOG_SEND_REAL_HTTP_STATUS_STR);
105 pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key");
106 pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
107 pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR);
108 pub const APPLICATION_PROTOBUF: HeaderValue =
109 HeaderValue::from_static(APPLICATION_PROTOBUF_STR);
110 pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName =
111 HeaderName::from_static("x-datadog-test-session-token");
112}
113
114pub type HttpClient = http_common::GenericHttpClient<connector::Connector>;
115pub type GenericHttpClient<C> = http_common::GenericHttpClient<C>;
116pub type HttpResponse = http_common::HttpResponse;
117pub type HttpRequestBuilder = hyper::http::request::Builder;
118pub trait Connect:
119 hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
120{
121}
122impl<C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static> Connect
123 for C
124{
125}
126
127pub use const_format;
129
130#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
131pub struct Endpoint {
132 #[serde(serialize_with = "serialize_uri", deserialize_with = "deserialize_uri")]
133 pub url: hyper::Uri,
134 pub api_key: Option<Cow<'static, str>>,
135 pub timeout_ms: u64,
136 pub test_token: Option<Cow<'static, str>>,
138 #[serde(default)]
141 pub use_system_resolver: bool,
142}
143
144impl Default for Endpoint {
145 fn default() -> Self {
146 Endpoint {
147 url: hyper::Uri::default(),
148 api_key: None,
149 timeout_ms: Self::DEFAULT_TIMEOUT,
150 test_token: None,
151 use_system_resolver: false,
152 }
153 }
154}
155
156#[derive(serde::Deserialize, serde::Serialize)]
157struct SerializedUri<'a> {
158 scheme: Option<Cow<'a, str>>,
159 authority: Option<Cow<'a, str>>,
160 path_and_query: Option<Cow<'a, str>>,
161}
162
163fn serialize_uri<S>(uri: &hyper::Uri, serializer: S) -> Result<S::Ok, S::Error>
164where
165 S: Serializer,
166{
167 let parts = uri.clone().into_parts();
168 let uri = SerializedUri {
169 scheme: parts.scheme.as_ref().map(|s| Cow::Borrowed(s.as_str())),
170 authority: parts.authority.as_ref().map(|s| Cow::Borrowed(s.as_str())),
171 path_and_query: parts
172 .path_and_query
173 .as_ref()
174 .map(|s| Cow::Borrowed(s.as_str())),
175 };
176 uri.serialize(serializer)
177}
178
179fn deserialize_uri<'de, D>(deserializer: D) -> Result<hyper::Uri, D::Error>
180where
181 D: Deserializer<'de>,
182{
183 let uri = SerializedUri::deserialize(deserializer)?;
184 let mut builder = hyper::Uri::builder();
185 if let Some(v) = uri.authority {
186 builder = builder.authority(v.deref());
187 }
188 if let Some(v) = uri.scheme {
189 builder = builder.scheme(v.deref());
190 }
191 if let Some(v) = uri.path_and_query {
192 builder = builder.path_and_query(v.deref());
193 }
194
195 builder.build().map_err(Error::custom)
196}
197
198pub fn parse_uri(uri: &str) -> anyhow::Result<hyper::Uri> {
207 if let Some(path) = uri.strip_prefix("unix://") {
208 encode_uri_path_in_authority("unix", path)
209 } else if let Some(path) = uri.strip_prefix("windows:") {
210 encode_uri_path_in_authority("windows", path)
211 } else if let Some(path) = uri.strip_prefix("file://") {
212 encode_uri_path_in_authority("file", path)
213 } else {
214 Ok(hyper::Uri::from_str(uri)?)
215 }
216}
217
218fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<hyper::Uri> {
219 let mut parts = uri::Parts::default();
220 parts.scheme = uri::Scheme::from_str(scheme).ok();
221
222 let path = hex::encode(path);
223
224 parts.authority = uri::Authority::from_str(path.as_str()).ok();
225 parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
226 Ok(hyper::Uri::from_parts(parts)?)
227}
228
229pub fn decode_uri_path_in_authority(uri: &hyper::Uri) -> anyhow::Result<PathBuf> {
230 let path = hex::decode(uri.authority().context("missing uri authority")?.as_str())?;
231 #[cfg(unix)]
232 {
233 use std::os::unix::ffi::OsStringExt;
234 Ok(PathBuf::from(std::ffi::OsString::from_vec(path)))
235 }
236 #[cfg(not(unix))]
237 {
238 match String::from_utf8(path) {
239 Ok(s) => Ok(PathBuf::from(s.as_str())),
240 _ => Err(anyhow::anyhow!("file uri should be utf-8")),
241 }
242 }
243}
244
245impl Endpoint {
246 pub const DEFAULT_TIMEOUT: u64 = 3_000;
248
249 pub fn get_optional_headers(&self) -> impl Iterator<Item = (&'static str, &str)> {
252 [
253 self.api_key.as_ref().map(|v| ("dd-api-key", v.as_ref())),
254 self.test_token
255 .as_ref()
256 .map(|v| ("x-datadog-test-session-token", v.as_ref())),
257 ]
258 .into_iter()
259 .flatten()
260 }
261
262 pub fn to_request_builder(&self, user_agent: &str) -> anyhow::Result<HttpRequestBuilder> {
267 let mut builder = hyper::Request::builder()
268 .uri(self.url.clone())
269 .header(hyper::header::USER_AGENT, user_agent);
270
271 for (name, value) in self.get_optional_headers() {
273 builder = builder.header(name, value);
274 }
275
276 for (name, value) in entity_id::get_entity_headers() {
278 builder = builder.header(name, value);
279 }
280
281 Ok(builder)
282 }
283
284 #[inline]
285 pub fn from_slice(url: &str) -> Endpoint {
286 Endpoint {
287 #[allow(clippy::unwrap_used)]
288 url: parse_uri(url).unwrap(),
289 ..Default::default()
290 }
291 }
292
293 #[inline]
294 pub fn from_url(url: hyper::Uri) -> Endpoint {
295 Endpoint {
296 url,
297 ..Default::default()
298 }
299 }
300
301 pub fn is_file_endpoint(&self) -> bool {
302 self.url.scheme_str() == Some("file")
303 }
304
305 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
314 self.timeout_ms = if timeout_ms == 0 {
315 Self::DEFAULT_TIMEOUT
316 } else {
317 timeout_ms
318 };
319 self
320 }
321
322 pub fn with_system_resolver(mut self, use_system_resolver: bool) -> Self {
325 self.use_system_resolver = use_system_resolver;
326 self
327 }
328
329 #[cfg(feature = "reqwest")]
352 pub fn to_reqwest_client_builder(&self) -> anyhow::Result<(reqwest::ClientBuilder, String)> {
353 use anyhow::Context;
354
355 let mut builder = reqwest::Client::builder()
356 .timeout(std::time::Duration::from_millis(self.timeout_ms))
357 .hickory_dns(!self.use_system_resolver);
358
359 let request_url = match self.url.scheme_str() {
360 Some("http") | Some("https") => self.url.to_string(),
362
363 Some("file") => {
365 let output_path = decode_uri_path_in_authority(&self.url)
366 .context("Failed to decode file path from URI")?;
367 let socket_or_pipe_path = dump_server::spawn_dump_server(output_path)?;
368
369 #[cfg(unix)]
371 {
372 builder = builder.unix_socket(socket_or_pipe_path);
373 }
374 #[cfg(windows)]
375 {
376 builder = builder
377 .windows_named_pipe(socket_or_pipe_path.to_string_lossy().to_string());
378 }
379
380 "http://localhost/".to_string()
381 }
382
383 #[cfg(unix)]
385 Some("unix") => {
386 use connector::uds::socket_path_from_uri;
387 let socket_path = socket_path_from_uri(&self.url)?;
388 builder = builder.unix_socket(socket_path);
389 format!("http://localhost{}", self.url.path())
390 }
391
392 #[cfg(windows)]
394 Some("windows") => {
395 use connector::named_pipe::named_pipe_path_from_uri;
396 let pipe_path = named_pipe_path_from_uri(&self.url)?;
397 builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
398 format!("http://localhost{}", self.url.path())
399 }
400
401 scheme => anyhow::bail!("Unsupported endpoint scheme: {:?}", scheme),
403 };
404
405 Ok((builder, request_url))
406 }
407}