#![cfg_attr(not(test), deny(clippy::panic))]
#![cfg_attr(not(test), deny(clippy::unwrap_used))]
#![cfg_attr(not(test), deny(clippy::expect_used))]
#![cfg_attr(not(test), deny(clippy::todo))]
#![cfg_attr(not(test), deny(clippy::unimplemented))]
use anyhow::Context;
use http::uri;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::sync::{Mutex, MutexGuard};
use std::{borrow::Cow, ops::Deref, path::PathBuf, str::FromStr};
pub mod azure_app_services;
#[cfg(not(target_arch = "wasm32"))]
pub mod cc_utils;
#[cfg(not(target_arch = "wasm32"))]
pub mod connector;
#[cfg(feature = "reqwest")]
pub mod dump_server;
pub mod entity_id;
#[macro_use]
pub mod cstr;
#[cfg(feature = "bench-utils")]
pub mod bench_utils;
pub mod config;
pub mod error;
pub mod http_common;
pub mod multipart;
#[cfg(not(target_arch = "wasm32"))]
pub mod rate_limiter;
pub mod tag;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
#[cfg(not(target_arch = "wasm32"))]
pub mod threading;
#[cfg(not(target_arch = "wasm32"))]
pub mod timeout;
pub mod unix_utils;
pub trait MutexExt<T> {
fn lock_or_panic(&self) -> MutexGuard<'_, T>;
}
impl<T> MutexExt<T> for Mutex<T> {
#[inline(always)]
#[track_caller]
fn lock_or_panic(&self) -> MutexGuard<'_, T> {
#[allow(clippy::unwrap_used)]
self.lock().unwrap()
}
}
pub mod header {
#![allow(clippy::declare_interior_mutable_const)]
use http::{header::HeaderName, HeaderValue};
pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack";
pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf";
pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id");
pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id");
pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env");
pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count");
pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName =
HeaderName::from_static("datadog-send-real-http-status");
pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key");
pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR);
pub const APPLICATION_PROTOBUF: HeaderValue =
HeaderValue::from_static(APPLICATION_PROTOBUF_STR);
pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName =
HeaderName::from_static("x-datadog-test-session-token");
}
#[cfg(not(target_arch = "wasm32"))]
pub type HttpClient = http_common::GenericHttpClient<connector::Connector>;
#[cfg(not(target_arch = "wasm32"))]
pub type HttpResponse = http_common::HttpResponse;
pub type HttpRequestBuilder = http::request::Builder;
#[cfg(not(target_arch = "wasm32"))]
pub trait Connect:
hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
{
}
#[cfg(not(target_arch = "wasm32"))]
impl<C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static> Connect
for C
{
}
pub use const_format;
#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
pub struct Endpoint {
#[serde(serialize_with = "serialize_uri", deserialize_with = "deserialize_uri")]
pub url: http::Uri,
pub api_key: Option<Cow<'static, str>>,
pub timeout_ms: u64,
pub test_token: Option<Cow<'static, str>>,
#[serde(default)]
pub use_system_resolver: bool,
}
impl Default for Endpoint {
fn default() -> Self {
Endpoint {
url: http::Uri::default(),
api_key: None,
timeout_ms: Self::DEFAULT_TIMEOUT,
test_token: None,
use_system_resolver: false,
}
}
}
#[derive(serde::Deserialize, serde::Serialize)]
struct SerializedUri<'a> {
scheme: Option<Cow<'a, str>>,
authority: Option<Cow<'a, str>>,
path_and_query: Option<Cow<'a, str>>,
}
fn serialize_uri<S>(uri: &http::Uri, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let parts = uri.clone().into_parts();
let uri = SerializedUri {
scheme: parts.scheme.as_ref().map(|s| Cow::Borrowed(s.as_str())),
authority: parts.authority.as_ref().map(|s| Cow::Borrowed(s.as_str())),
path_and_query: parts
.path_and_query
.as_ref()
.map(|s| Cow::Borrowed(s.as_str())),
};
uri.serialize(serializer)
}
fn deserialize_uri<'de, D>(deserializer: D) -> Result<http::Uri, D::Error>
where
D: Deserializer<'de>,
{
let uri = SerializedUri::deserialize(deserializer)?;
let mut builder = http::Uri::builder();
if let Some(v) = uri.authority {
builder = builder.authority(v.deref());
}
if let Some(v) = uri.scheme {
builder = builder.scheme(v.deref());
}
if let Some(v) = uri.path_and_query {
builder = builder.path_and_query(v.deref());
}
builder.build().map_err(Error::custom)
}
pub fn parse_uri(uri: &str) -> anyhow::Result<http::Uri> {
if let Some(path) = uri.strip_prefix("unix://") {
encode_uri_path_in_authority("unix", path)
} else if let Some(path) = uri.strip_prefix("windows:") {
encode_uri_path_in_authority("windows", path)
} else if let Some(path) = uri.strip_prefix("file://") {
encode_uri_path_in_authority("file", path)
} else {
Ok(http::Uri::from_str(uri)?)
}
}
fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<http::Uri> {
let mut parts = uri::Parts::default();
parts.scheme = uri::Scheme::from_str(scheme).ok();
let path = hex::encode(path);
parts.authority = uri::Authority::from_str(path.as_str()).ok();
parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
Ok(http::Uri::from_parts(parts)?)
}
pub fn decode_uri_path_in_authority(uri: &http::Uri) -> anyhow::Result<PathBuf> {
let path = hex::decode(uri.authority().context("missing uri authority")?.as_str())?;
#[cfg(unix)]
{
use std::os::unix::ffi::OsStringExt;
Ok(PathBuf::from(std::ffi::OsString::from_vec(path)))
}
#[cfg(not(unix))]
{
match String::from_utf8(path) {
Ok(s) => Ok(PathBuf::from(s.as_str())),
_ => Err(anyhow::anyhow!("file uri should be utf-8")),
}
}
}
impl Endpoint {
pub const DEFAULT_TIMEOUT: u64 = 3_000;
pub fn get_optional_headers(&self) -> impl Iterator<Item = (&'static str, &str)> {
[
self.api_key.as_ref().map(|v| ("dd-api-key", v.as_ref())),
self.test_token
.as_ref()
.map(|v| ("x-datadog-test-session-token", v.as_ref())),
]
.into_iter()
.flatten()
}
pub fn set_standard_headers(
&self,
mut builder: http::request::Builder,
user_agent: &str,
) -> http::request::Builder {
builder = builder.header("user-agent", user_agent);
for (name, value) in self.get_optional_headers() {
builder = builder.header(name, value);
}
for (name, value) in entity_id::get_entity_headers() {
builder = builder.header(name, value);
}
builder
}
pub fn to_request_builder(&self, user_agent: &str) -> anyhow::Result<HttpRequestBuilder> {
let mut builder = http::Request::builder()
.uri(self.url.clone())
.header(http::header::USER_AGENT, user_agent);
for (name, value) in self.get_optional_headers() {
builder = builder.header(name, value);
}
for (name, value) in entity_id::get_entity_headers() {
builder = builder.header(name, value);
}
Ok(builder)
}
#[inline]
pub fn from_slice(url: &str) -> Endpoint {
Endpoint {
#[allow(clippy::unwrap_used)]
url: parse_uri(url).unwrap(),
..Default::default()
}
}
#[inline]
pub fn from_url(url: http::Uri) -> Endpoint {
Endpoint {
url,
..Default::default()
}
}
pub fn is_file_endpoint(&self) -> bool {
self.url.scheme_str() == Some("file")
}
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = if timeout_ms == 0 {
Self::DEFAULT_TIMEOUT
} else {
timeout_ms
};
self
}
pub fn with_system_resolver(mut self, use_system_resolver: bool) -> Self {
self.use_system_resolver = use_system_resolver;
self
}
#[cfg(feature = "reqwest")]
pub fn to_reqwest_client_builder(&self) -> anyhow::Result<(reqwest::ClientBuilder, String)> {
use anyhow::Context;
let mut builder = reqwest::Client::builder()
.timeout(std::time::Duration::from_millis(self.timeout_ms))
.hickory_dns(!self.use_system_resolver)
.no_proxy();
let request_url = match self.url.scheme_str() {
Some("http") | Some("https") => self.url.to_string(),
Some("file") => {
let output_path = decode_uri_path_in_authority(&self.url)
.context("Failed to decode file path from URI")?;
let socket_or_pipe_path = dump_server::spawn_dump_server(output_path)?;
#[cfg(unix)]
{
builder = builder.unix_socket(socket_or_pipe_path);
}
#[cfg(windows)]
{
builder = builder
.windows_named_pipe(socket_or_pipe_path.to_string_lossy().to_string());
}
"http://localhost/".to_string()
}
#[cfg(unix)]
Some("unix") => {
use connector::uds::socket_path_from_uri;
let socket_path = socket_path_from_uri(&self.url)?;
builder = builder.unix_socket(socket_path);
format!("http://localhost{}", self.url.path())
}
#[cfg(windows)]
Some("windows") => {
use connector::named_pipe::named_pipe_path_from_uri;
let pipe_path = named_pipe_path_from_uri(&self.url)?;
builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
format!("http://localhost{}", self.url.path())
}
scheme => anyhow::bail!("Unsupported endpoint scheme: {:?}", scheme),
};
Ok((builder, request_url))
}
}