1#![cfg_attr(not(test), deny(clippy::panic))]
4#![cfg_attr(not(test), deny(clippy::unwrap_used))]
5#![cfg_attr(not(test), deny(clippy::expect_used))]
6#![cfg_attr(not(test), deny(clippy::todo))]
7#![cfg_attr(not(test), deny(clippy::unimplemented))]
8
9use anyhow::Context;
10use http::uri;
11use serde::de::Error;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use std::sync::{Mutex, MutexGuard};
14use std::{borrow::Cow, ops::Deref, path::PathBuf, str::FromStr};
15
16pub mod azure_app_services;
17#[cfg(not(target_arch = "wasm32"))]
18pub mod cc_utils;
19#[cfg(not(target_arch = "wasm32"))]
20pub mod connector;
21#[cfg(feature = "reqwest")]
22pub mod dump_server;
23pub mod entity_id;
24#[macro_use]
25pub mod cstr;
26#[cfg(feature = "bench-utils")]
27pub mod bench_utils;
28pub mod config;
29pub mod error;
30pub mod http_common;
31pub mod multipart;
32#[cfg(not(target_arch = "wasm32"))]
33pub mod rate_limiter;
34pub mod tag;
35#[cfg(any(test, feature = "test-utils"))]
36pub mod test_utils;
37#[cfg(not(target_arch = "wasm32"))]
38pub mod threading;
39#[cfg(not(target_arch = "wasm32"))]
40pub mod timeout;
41pub mod unix_utils;
42
43pub trait MutexExt<T> {
80 fn lock_or_panic(&self) -> MutexGuard<'_, T>;
81}
82
83impl<T> MutexExt<T> for Mutex<T> {
84 #[inline(always)]
85 #[track_caller]
86 fn lock_or_panic(&self) -> MutexGuard<'_, T> {
87 #[allow(clippy::unwrap_used)]
88 self.lock().unwrap()
89 }
90}
91
92pub mod header {
93 #![allow(clippy::declare_interior_mutable_const)]
94 use http::{header::HeaderName, HeaderValue};
95
96 pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack";
97 pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf";
98
99 pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id");
100 pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id");
101 pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env");
102 pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count");
103 pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName =
107 HeaderName::from_static("datadog-send-real-http-status");
108 pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key");
109 pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
110 pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR);
111 pub const APPLICATION_PROTOBUF: HeaderValue =
112 HeaderValue::from_static(APPLICATION_PROTOBUF_STR);
113 pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName =
114 HeaderName::from_static("x-datadog-test-session-token");
115}
116
117#[cfg(not(target_arch = "wasm32"))]
118pub type HttpClient = http_common::GenericHttpClient<connector::Connector>;
119#[cfg(not(target_arch = "wasm32"))]
120pub type HttpResponse = http_common::HttpResponse;
121pub type HttpRequestBuilder = http::request::Builder;
122#[cfg(not(target_arch = "wasm32"))]
123pub trait Connect:
124 hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
125{
126}
127#[cfg(not(target_arch = "wasm32"))]
128impl<C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static> Connect
129 for C
130{
131}
132
133pub use const_format;
135
136#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
137pub struct Endpoint {
138 #[serde(serialize_with = "serialize_uri", deserialize_with = "deserialize_uri")]
139 pub url: http::Uri,
140 pub api_key: Option<Cow<'static, str>>,
141 pub timeout_ms: u64,
142 pub test_token: Option<Cow<'static, str>>,
144 #[serde(default)]
147 pub use_system_resolver: bool,
148}
149
150impl Default for Endpoint {
151 fn default() -> Self {
152 Endpoint {
153 url: http::Uri::default(),
154 api_key: None,
155 timeout_ms: Self::DEFAULT_TIMEOUT,
156 test_token: None,
157 use_system_resolver: false,
158 }
159 }
160}
161
162#[derive(serde::Deserialize, serde::Serialize)]
163struct SerializedUri<'a> {
164 scheme: Option<Cow<'a, str>>,
165 authority: Option<Cow<'a, str>>,
166 path_and_query: Option<Cow<'a, str>>,
167}
168
169fn serialize_uri<S>(uri: &http::Uri, serializer: S) -> Result<S::Ok, S::Error>
170where
171 S: Serializer,
172{
173 let parts = uri.clone().into_parts();
174 let uri = SerializedUri {
175 scheme: parts.scheme.as_ref().map(|s| Cow::Borrowed(s.as_str())),
176 authority: parts.authority.as_ref().map(|s| Cow::Borrowed(s.as_str())),
177 path_and_query: parts
178 .path_and_query
179 .as_ref()
180 .map(|s| Cow::Borrowed(s.as_str())),
181 };
182 uri.serialize(serializer)
183}
184
185fn deserialize_uri<'de, D>(deserializer: D) -> Result<http::Uri, D::Error>
186where
187 D: Deserializer<'de>,
188{
189 let uri = SerializedUri::deserialize(deserializer)?;
190 let mut builder = http::Uri::builder();
191 if let Some(v) = uri.authority {
192 builder = builder.authority(v.deref());
193 }
194 if let Some(v) = uri.scheme {
195 builder = builder.scheme(v.deref());
196 }
197 if let Some(v) = uri.path_and_query {
198 builder = builder.path_and_query(v.deref());
199 }
200
201 builder.build().map_err(Error::custom)
202}
203
204pub fn parse_uri(uri: &str) -> anyhow::Result<http::Uri> {
213 if let Some(path) = uri.strip_prefix("unix://") {
214 encode_uri_path_in_authority("unix", path)
215 } else if let Some(path) = uri.strip_prefix("windows:") {
216 encode_uri_path_in_authority("windows", path)
217 } else if let Some(path) = uri.strip_prefix("file://") {
218 encode_uri_path_in_authority("file", path)
219 } else {
220 Ok(http::Uri::from_str(uri)?)
221 }
222}
223
224fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<http::Uri> {
225 let mut parts = uri::Parts::default();
226 parts.scheme = uri::Scheme::from_str(scheme).ok();
227
228 let path = hex::encode(path);
229
230 parts.authority = uri::Authority::from_str(path.as_str()).ok();
231 parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
232 Ok(http::Uri::from_parts(parts)?)
233}
234
235pub fn decode_uri_path_in_authority(uri: &http::Uri) -> anyhow::Result<PathBuf> {
236 let path = hex::decode(uri.authority().context("missing uri authority")?.as_str())?;
237 #[cfg(unix)]
238 {
239 use std::os::unix::ffi::OsStringExt;
240 Ok(PathBuf::from(std::ffi::OsString::from_vec(path)))
241 }
242 #[cfg(not(unix))]
243 {
244 match String::from_utf8(path) {
245 Ok(s) => Ok(PathBuf::from(s.as_str())),
246 _ => Err(anyhow::anyhow!("file uri should be utf-8")),
247 }
248 }
249}
250
251impl Endpoint {
252 pub const DEFAULT_TIMEOUT: u64 = 3_000;
254
255 pub fn get_optional_headers(&self) -> impl Iterator<Item = (&'static str, &str)> {
258 [
259 self.api_key.as_ref().map(|v| ("dd-api-key", v.as_ref())),
260 self.test_token
261 .as_ref()
262 .map(|v| ("x-datadog-test-session-token", v.as_ref())),
263 ]
264 .into_iter()
265 .flatten()
266 }
267
268 pub fn set_standard_headers(
271 &self,
272 mut builder: http::request::Builder,
273 user_agent: &str,
274 ) -> http::request::Builder {
275 builder = builder.header("user-agent", user_agent);
276 for (name, value) in self.get_optional_headers() {
277 builder = builder.header(name, value);
278 }
279 for (name, value) in entity_id::get_entity_headers() {
280 builder = builder.header(name, value);
281 }
282 builder
283 }
284
285 pub fn to_request_builder(&self, user_agent: &str) -> anyhow::Result<HttpRequestBuilder> {
290 let mut builder = http::Request::builder()
291 .uri(self.url.clone())
292 .header(http::header::USER_AGENT, user_agent);
293
294 for (name, value) in self.get_optional_headers() {
296 builder = builder.header(name, value);
297 }
298
299 for (name, value) in entity_id::get_entity_headers() {
301 builder = builder.header(name, value);
302 }
303
304 Ok(builder)
305 }
306
307 #[inline]
308 pub fn from_slice(url: &str) -> Endpoint {
309 Endpoint {
310 #[allow(clippy::unwrap_used)]
311 url: parse_uri(url).unwrap(),
312 ..Default::default()
313 }
314 }
315
316 #[inline]
317 pub fn from_url(url: http::Uri) -> Endpoint {
318 Endpoint {
319 url,
320 ..Default::default()
321 }
322 }
323
324 pub fn is_file_endpoint(&self) -> bool {
325 self.url.scheme_str() == Some("file")
326 }
327
328 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
337 self.timeout_ms = if timeout_ms == 0 {
338 Self::DEFAULT_TIMEOUT
339 } else {
340 timeout_ms
341 };
342 self
343 }
344
345 pub fn with_system_resolver(mut self, use_system_resolver: bool) -> Self {
348 self.use_system_resolver = use_system_resolver;
349 self
350 }
351
352 #[cfg(feature = "reqwest")]
375 pub fn to_reqwest_client_builder(&self) -> anyhow::Result<(reqwest::ClientBuilder, String)> {
376 use anyhow::Context;
377
378 let mut builder = reqwest::Client::builder()
383 .timeout(std::time::Duration::from_millis(self.timeout_ms))
384 .hickory_dns(!self.use_system_resolver)
385 .no_proxy();
386
387 let request_url = match self.url.scheme_str() {
388 Some("http") | Some("https") => self.url.to_string(),
390
391 Some("file") => {
393 let output_path = decode_uri_path_in_authority(&self.url)
394 .context("Failed to decode file path from URI")?;
395 let socket_or_pipe_path = dump_server::spawn_dump_server(output_path)?;
396
397 #[cfg(unix)]
399 {
400 builder = builder.unix_socket(socket_or_pipe_path);
401 }
402 #[cfg(windows)]
403 {
404 builder = builder
405 .windows_named_pipe(socket_or_pipe_path.to_string_lossy().to_string());
406 }
407
408 "http://localhost/".to_string()
409 }
410
411 #[cfg(unix)]
413 Some("unix") => {
414 use connector::uds::socket_path_from_uri;
415 let socket_path = socket_path_from_uri(&self.url)?;
416 builder = builder.unix_socket(socket_path);
417 format!("http://localhost{}", self.url.path())
418 }
419
420 #[cfg(windows)]
422 Some("windows") => {
423 use connector::named_pipe::named_pipe_path_from_uri;
424 let pipe_path = named_pipe_path_from_uri(&self.url)?;
425 builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
426 format!("http://localhost{}", self.url.path())
427 }
428
429 scheme => anyhow::bail!("Unsupported endpoint scheme: {:?}", scheme),
431 };
432
433 Ok((builder, request_url))
434 }
435}