1#![cfg_attr(not(test), deny(clippy::panic))]
4#![cfg_attr(not(test), deny(clippy::unwrap_used))]
5#![cfg_attr(not(test), deny(clippy::expect_used))]
6#![cfg_attr(not(test), deny(clippy::todo))]
7#![cfg_attr(not(test), deny(clippy::unimplemented))]
8
9use anyhow::Context;
10use http::uri;
11use serde::de::Error;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
14use std::{borrow::Cow, ops::Deref, path::PathBuf, str::FromStr};
15
16pub mod azure_app_services;
17#[cfg(not(target_arch = "wasm32"))]
18pub mod cc_utils;
19#[cfg(not(target_arch = "wasm32"))]
20pub mod connector;
21#[cfg(feature = "reqwest")]
22pub mod dump_server;
23pub mod entity_id;
24pub mod regex_engine;
25#[macro_use]
26pub mod cstr;
27#[cfg(feature = "bench-utils")]
28pub mod bench_utils;
29pub mod config;
30pub mod error;
31pub mod http_common;
32pub mod multipart;
33#[cfg(not(target_arch = "wasm32"))]
34pub mod rate_limiter;
35pub mod tag;
36#[cfg(any(test, feature = "test-utils"))]
37pub mod test_utils;
38#[cfg(not(target_arch = "wasm32"))]
39pub mod threading;
40#[cfg(not(target_arch = "wasm32"))]
41pub mod timeout;
42pub mod unix_utils;
43
44pub trait MutexExt<T> {
81 fn lock_or_panic(&self) -> MutexGuard<'_, T>;
82}
83
84impl<T> MutexExt<T> for Mutex<T> {
85 #[inline(always)]
86 #[track_caller]
87 fn lock_or_panic(&self) -> MutexGuard<'_, T> {
88 #[allow(clippy::unwrap_used)]
89 self.lock().unwrap()
90 }
91}
92
93pub trait RwLockExt<T> {
118 fn read_or_panic(&self) -> RwLockReadGuard<'_, T>;
119 fn write_or_panic(&self) -> RwLockWriteGuard<'_, T>;
120}
121
122impl<T> RwLockExt<T> for RwLock<T> {
123 #[inline(always)]
124 #[track_caller]
125 fn read_or_panic(&self) -> RwLockReadGuard<'_, T> {
126 #[allow(clippy::unwrap_used)]
127 self.read().unwrap()
128 }
129
130 #[inline(always)]
131 #[track_caller]
132 fn write_or_panic(&self) -> RwLockWriteGuard<'_, T> {
133 #[allow(clippy::unwrap_used)]
134 self.write().unwrap()
135 }
136}
137
138pub trait ResultInfallibleExt<T>: sealed::Sealed {
154 fn unwrap_infallible(self) -> T;
155}
156
157impl<T> ResultInfallibleExt<T> for Result<T, core::convert::Infallible> {
158 #[inline(always)]
159 fn unwrap_infallible(self) -> T {
160 match self {
161 Ok(value) => value,
162 Err(never) => match never {},
163 }
164 }
165}
166
167mod sealed {
168 pub trait Sealed {}
169 impl<T> Sealed for Result<T, core::convert::Infallible> {}
170}
171
172pub mod header {
173 #![allow(clippy::declare_interior_mutable_const)]
174 use http::{header::HeaderName, HeaderValue};
175
176 pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack";
177 pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf";
178
179 pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id");
180 pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id");
181 pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env");
182 pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count");
183 pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName =
187 HeaderName::from_static("datadog-send-real-http-status");
188 pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key");
189 pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
190 pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR);
191 pub const APPLICATION_PROTOBUF: HeaderValue =
192 HeaderValue::from_static(APPLICATION_PROTOBUF_STR);
193 pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName =
194 HeaderName::from_static("x-datadog-test-session-token");
195}
196
197#[cfg(not(target_arch = "wasm32"))]
198pub type HttpClient = http_common::GenericHttpClient<connector::Connector>;
199#[cfg(not(target_arch = "wasm32"))]
200pub type HttpResponse = http_common::HttpResponse;
201pub type HttpRequestBuilder = http::request::Builder;
202#[cfg(not(target_arch = "wasm32"))]
203pub trait Connect:
204 hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
205{
206}
207#[cfg(not(target_arch = "wasm32"))]
208impl<C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static> Connect
209 for C
210{
211}
212
213pub use const_format;
215
216#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
217pub struct Endpoint {
218 #[serde(serialize_with = "serialize_uri", deserialize_with = "deserialize_uri")]
219 pub url: http::Uri,
220 pub api_key: Option<Cow<'static, str>>,
221 pub timeout_ms: u64,
222 pub test_token: Option<Cow<'static, str>>,
224 #[serde(default)]
227 pub use_system_resolver: bool,
228}
229
230impl Default for Endpoint {
231 fn default() -> Self {
232 Endpoint {
233 url: http::Uri::default(),
234 api_key: None,
235 timeout_ms: Self::DEFAULT_TIMEOUT,
236 test_token: None,
237 use_system_resolver: false,
238 }
239 }
240}
241
242#[derive(serde::Deserialize, serde::Serialize)]
243struct SerializedUri<'a> {
244 scheme: Option<Cow<'a, str>>,
245 authority: Option<Cow<'a, str>>,
246 path_and_query: Option<Cow<'a, str>>,
247}
248
249fn serialize_uri<S>(uri: &http::Uri, serializer: S) -> Result<S::Ok, S::Error>
250where
251 S: Serializer,
252{
253 let parts = uri.clone().into_parts();
254 let uri = SerializedUri {
255 scheme: parts.scheme.as_ref().map(|s| Cow::Borrowed(s.as_str())),
256 authority: parts.authority.as_ref().map(|s| Cow::Borrowed(s.as_str())),
257 path_and_query: parts
258 .path_and_query
259 .as_ref()
260 .map(|s| Cow::Borrowed(s.as_str())),
261 };
262 uri.serialize(serializer)
263}
264
265fn deserialize_uri<'de, D>(deserializer: D) -> Result<http::Uri, D::Error>
266where
267 D: Deserializer<'de>,
268{
269 let uri = SerializedUri::deserialize(deserializer)?;
270 let mut builder = http::Uri::builder();
271 if let Some(v) = uri.authority {
272 builder = builder.authority(v.deref());
273 }
274 if let Some(v) = uri.scheme {
275 builder = builder.scheme(v.deref());
276 }
277 if let Some(v) = uri.path_and_query {
278 builder = builder.path_and_query(v.deref());
279 }
280
281 builder.build().map_err(Error::custom)
282}
283
284pub fn parse_uri(uri: &str) -> anyhow::Result<http::Uri> {
293 if let Some(path) = uri.strip_prefix("unix://") {
294 encode_uri_path_in_authority("unix", path)
295 } else if let Some(path) = uri.strip_prefix("windows:") {
296 encode_uri_path_in_authority("windows", path)
297 } else if let Some(path) = uri.strip_prefix("file://") {
298 encode_uri_path_in_authority("file", path)
299 } else {
300 Ok(http::Uri::from_str(uri)?)
301 }
302}
303
304fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<http::Uri> {
305 let mut parts = uri::Parts::default();
306 parts.scheme = uri::Scheme::from_str(scheme).ok();
307
308 let path = hex::encode(path);
309
310 parts.authority = uri::Authority::from_str(path.as_str()).ok();
311 parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
312 Ok(http::Uri::from_parts(parts)?)
313}
314
315pub fn decode_uri_path_in_authority(uri: &http::Uri) -> anyhow::Result<PathBuf> {
316 let path = hex::decode(uri.authority().context("missing uri authority")?.as_str())?;
317 #[cfg(unix)]
318 {
319 use std::os::unix::ffi::OsStringExt;
320 Ok(PathBuf::from(std::ffi::OsString::from_vec(path)))
321 }
322 #[cfg(not(unix))]
323 {
324 match String::from_utf8(path) {
325 Ok(s) => Ok(PathBuf::from(s.as_str())),
326 _ => Err(anyhow::anyhow!("file uri should be utf-8")),
327 }
328 }
329}
330
331impl Endpoint {
332 pub const DEFAULT_TIMEOUT: u64 = 3_000;
334
335 pub fn get_optional_headers(&self) -> impl Iterator<Item = (&'static str, &str)> {
338 [
339 self.api_key.as_ref().map(|v| ("dd-api-key", v.as_ref())),
340 self.test_token
341 .as_ref()
342 .map(|v| ("x-datadog-test-session-token", v.as_ref())),
343 ]
344 .into_iter()
345 .flatten()
346 }
347
348 pub fn set_standard_headers(
351 &self,
352 mut builder: http::request::Builder,
353 user_agent: &str,
354 ) -> http::request::Builder {
355 builder = builder.header("user-agent", user_agent);
356 for (name, value) in self.get_optional_headers() {
357 builder = builder.header(name, value);
358 }
359 for (name, value) in entity_id::get_entity_headers() {
360 builder = builder.header(name, value);
361 }
362 builder
363 }
364
365 pub fn to_request_builder(&self, user_agent: &str) -> anyhow::Result<HttpRequestBuilder> {
370 let mut builder = http::Request::builder()
371 .uri(self.url.clone())
372 .header(http::header::USER_AGENT, user_agent);
373
374 for (name, value) in self.get_optional_headers() {
376 builder = builder.header(name, value);
377 }
378
379 for (name, value) in entity_id::get_entity_headers() {
381 builder = builder.header(name, value);
382 }
383
384 Ok(builder)
385 }
386
387 #[inline]
388 pub fn from_slice(url: &str) -> Endpoint {
389 Endpoint {
390 #[allow(clippy::unwrap_used)]
391 url: parse_uri(url).unwrap(),
392 ..Default::default()
393 }
394 }
395
396 #[inline]
397 pub fn from_url(url: http::Uri) -> Endpoint {
398 Endpoint {
399 url,
400 ..Default::default()
401 }
402 }
403
404 pub fn is_file_endpoint(&self) -> bool {
405 self.url.scheme_str() == Some("file")
406 }
407
408 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
417 self.timeout_ms = if timeout_ms == 0 {
418 Self::DEFAULT_TIMEOUT
419 } else {
420 timeout_ms
421 };
422 self
423 }
424
425 pub fn with_system_resolver(mut self, use_system_resolver: bool) -> Self {
428 self.use_system_resolver = use_system_resolver;
429 self
430 }
431
432 #[cfg(feature = "reqwest")]
455 pub fn to_reqwest_client_builder(&self) -> anyhow::Result<(reqwest::ClientBuilder, String)> {
456 use anyhow::Context;
457
458 let mut builder = reqwest::Client::builder()
463 .timeout(std::time::Duration::from_millis(self.timeout_ms))
464 .hickory_dns(!self.use_system_resolver)
465 .no_proxy();
466
467 let request_url = match self.url.scheme_str() {
468 Some("http") | Some("https") => self.url.to_string(),
470
471 Some("file") => {
473 let output_path = decode_uri_path_in_authority(&self.url)
474 .context("Failed to decode file path from URI")?;
475 let socket_or_pipe_path = dump_server::spawn_dump_server(output_path)?;
476
477 #[cfg(unix)]
479 {
480 builder = builder.unix_socket(socket_or_pipe_path);
481 }
482 #[cfg(windows)]
483 {
484 builder = builder
485 .windows_named_pipe(socket_or_pipe_path.to_string_lossy().to_string());
486 }
487
488 "http://localhost/".to_string()
489 }
490
491 #[cfg(unix)]
493 Some("unix") => {
494 use connector::uds::socket_path_from_uri;
495 let socket_path = socket_path_from_uri(&self.url)?;
496 builder = builder.unix_socket(socket_path);
497 format!("http://localhost{}", self.url.path())
498 }
499
500 #[cfg(windows)]
502 Some("windows") => {
503 use connector::named_pipe::named_pipe_path_from_uri;
504 let pipe_path = named_pipe_path_from_uri(&self.url)?;
505 builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
506 format!("http://localhost{}", self.url.path())
507 }
508
509 scheme => anyhow::bail!("Unsupported endpoint scheme: {:?}", scheme),
511 };
512
513 Ok((builder, request_url))
514 }
515}