libdd_common/
lib.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3#![cfg_attr(not(test), deny(clippy::panic))]
4#![cfg_attr(not(test), deny(clippy::unwrap_used))]
5#![cfg_attr(not(test), deny(clippy::expect_used))]
6#![cfg_attr(not(test), deny(clippy::todo))]
7#![cfg_attr(not(test), deny(clippy::unimplemented))]
8
9use hyper::{
10    header::HeaderValue,
11    http::uri::{self},
12};
13use serde::de::Error;
14use serde::{Deserialize, Deserializer, Serialize, Serializer};
15use std::sync::{Mutex, MutexGuard};
16use std::{borrow::Cow, ops::Deref, path::PathBuf, str::FromStr};
17
18pub mod azure_app_services;
19pub mod cc_utils;
20pub mod connector;
21pub mod entity_id;
22#[macro_use]
23pub mod cstr;
24pub mod config;
25pub mod error;
26pub mod hyper_migration;
27pub mod rate_limiter;
28pub mod tag;
29pub mod timeout;
30pub mod unix_utils;
31pub mod worker;
32
33/// Extension trait for `Mutex` to provide a method that acquires a lock, panicking if the lock is
34/// poisoned.
35///
36/// This helper function is intended to be used to avoid having to add many
37/// `#[allow(clippy::unwrap_used)]` annotations if there are a lot of usages of `Mutex`.
38///
39/// # Arguments
40///
41/// * `self` - A reference to the `Mutex` to lock.
42///
43/// # Returns
44///
45/// A `MutexGuard` that provides access to the locked data.
46///
47/// # Panics
48///
49/// This function will panic if the `Mutex` is poisoned.
50///
51/// # Examples
52///
53/// ```
54/// use libdd_common::MutexExt;
55/// use std::sync::{Arc, Mutex};
56///
57/// let data = Arc::new(Mutex::new(5));
58/// let data_clone = Arc::clone(&data);
59///
60/// std::thread::spawn(move || {
61///     let mut num = data_clone.lock_or_panic();
62///     *num += 1;
63/// })
64/// .join()
65/// .expect("Thread panicked");
66///
67/// assert_eq!(*data.lock_or_panic(), 6);
68/// ```
69pub trait MutexExt<T> {
70    fn lock_or_panic(&self) -> MutexGuard<'_, T>;
71}
72
73impl<T> MutexExt<T> for Mutex<T> {
74    #[inline(always)]
75    #[track_caller]
76    fn lock_or_panic(&self) -> MutexGuard<'_, T> {
77        #[allow(clippy::unwrap_used)]
78        self.lock().unwrap()
79    }
80}
81
82pub mod header {
83    #![allow(clippy::declare_interior_mutable_const)]
84    use hyper::{header::HeaderName, http::HeaderValue};
85
86    // These strings are defined separately to be used in context where &str are used to represent
87    // headers (e.g. SendData) while keeping a single source of truth.
88    pub const DATADOG_SEND_REAL_HTTP_STATUS_STR: &str = "datadog-send-real-http-status";
89    pub const DATADOG_TRACE_COUNT_STR: &str = "x-datadog-trace-count";
90    pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack";
91    pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf";
92
93    pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id");
94    pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id");
95    pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env");
96    pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count");
97    /// Signal to the agent to send 429 responses when a payload is dropped
98    /// If this is not set then the agent will always return a 200 regardless if the payload is
99    /// dropped.
100    pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName =
101        HeaderName::from_static(DATADOG_SEND_REAL_HTTP_STATUS_STR);
102    pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key");
103    pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
104    pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR);
105    pub const APPLICATION_PROTOBUF: HeaderValue =
106        HeaderValue::from_static(APPLICATION_PROTOBUF_STR);
107    pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName =
108        HeaderName::from_static("x-datadog-test-session-token");
109}
110
111pub type HttpClient = hyper_migration::GenericHttpClient<connector::Connector>;
112pub type GenericHttpClient<C> = hyper_migration::GenericHttpClient<C>;
113pub type HttpResponse = hyper_migration::HttpResponse;
114pub type HttpRequestBuilder = hyper::http::request::Builder;
115pub trait Connect:
116    hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
117{
118}
119impl<C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static> Connect
120    for C
121{
122}
123
124// Used by tag! macro
125use crate::entity_id::DD_EXTERNAL_ENV;
126pub use const_format;
127
128#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
129pub struct Endpoint {
130    #[serde(serialize_with = "serialize_uri", deserialize_with = "deserialize_uri")]
131    pub url: hyper::Uri,
132    pub api_key: Option<Cow<'static, str>>,
133    pub timeout_ms: u64,
134    /// Sets X-Datadog-Test-Session-Token header on any request
135    pub test_token: Option<Cow<'static, str>>,
136}
137
138impl Default for Endpoint {
139    fn default() -> Self {
140        Endpoint {
141            url: hyper::Uri::default(),
142            api_key: None,
143            timeout_ms: Self::DEFAULT_TIMEOUT,
144            test_token: None,
145        }
146    }
147}
148
149#[derive(serde::Deserialize, serde::Serialize)]
150struct SerializedUri<'a> {
151    scheme: Option<Cow<'a, str>>,
152    authority: Option<Cow<'a, str>>,
153    path_and_query: Option<Cow<'a, str>>,
154}
155
156fn serialize_uri<S>(uri: &hyper::Uri, serializer: S) -> Result<S::Ok, S::Error>
157where
158    S: Serializer,
159{
160    let parts = uri.clone().into_parts();
161    let uri = SerializedUri {
162        scheme: parts.scheme.as_ref().map(|s| Cow::Borrowed(s.as_str())),
163        authority: parts.authority.as_ref().map(|s| Cow::Borrowed(s.as_str())),
164        path_and_query: parts
165            .path_and_query
166            .as_ref()
167            .map(|s| Cow::Borrowed(s.as_str())),
168    };
169    uri.serialize(serializer)
170}
171
172fn deserialize_uri<'de, D>(deserializer: D) -> Result<hyper::Uri, D::Error>
173where
174    D: Deserializer<'de>,
175{
176    let uri = SerializedUri::deserialize(deserializer)?;
177    let mut builder = hyper::Uri::builder();
178    if let Some(v) = uri.authority {
179        builder = builder.authority(v.deref());
180    }
181    if let Some(v) = uri.scheme {
182        builder = builder.scheme(v.deref());
183    }
184    if let Some(v) = uri.path_and_query {
185        builder = builder.path_and_query(v.deref());
186    }
187
188    builder.build().map_err(Error::custom)
189}
190
191/// TODO: we should properly handle malformed urls
192/// * For windows and unix schemes:
193///     * For compatibility reasons with existing implementation this parser stores the encoded path
194///       in authority section as there is no existing standard [see](https://github.com/whatwg/url/issues/577)
195///       that covers this. We need to pick one hack or another
196///     * For windows, interprets everything after windows: as path
197///     * For unix, interprets everything after unix:// as path
198/// * For file scheme implementation will simply backfill missing authority section
199pub fn parse_uri(uri: &str) -> anyhow::Result<hyper::Uri> {
200    if let Some(path) = uri.strip_prefix("unix://") {
201        encode_uri_path_in_authority("unix", path)
202    } else if let Some(path) = uri.strip_prefix("windows:") {
203        encode_uri_path_in_authority("windows", path)
204    } else if let Some(path) = uri.strip_prefix("file://") {
205        encode_uri_path_in_authority("file", path)
206    } else {
207        Ok(hyper::Uri::from_str(uri)?)
208    }
209}
210
211fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<hyper::Uri> {
212    let mut parts = uri::Parts::default();
213    parts.scheme = uri::Scheme::from_str(scheme).ok();
214
215    let path = hex::encode(path);
216
217    parts.authority = uri::Authority::from_str(path.as_str()).ok();
218    parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
219    Ok(hyper::Uri::from_parts(parts)?)
220}
221
222pub fn decode_uri_path_in_authority(uri: &hyper::Uri) -> anyhow::Result<PathBuf> {
223    let path = hex::decode(
224        uri.authority()
225            .ok_or_else(|| anyhow::anyhow!("missing uri authority"))?
226            .as_str(),
227    )?;
228    #[cfg(unix)]
229    {
230        use std::os::unix::ffi::OsStringExt;
231        Ok(PathBuf::from(std::ffi::OsString::from_vec(path)))
232    }
233    #[cfg(not(unix))]
234    {
235        match String::from_utf8(path) {
236            Ok(s) => Ok(PathBuf::from(s.as_str())),
237            _ => Err(anyhow::anyhow!("file uri should be utf-8")),
238        }
239    }
240}
241
242impl Endpoint {
243    /// Default value for the timeout field in milliseconds.
244    pub const DEFAULT_TIMEOUT: u64 = 3_000;
245
246    /// Return a request builder with the following headers:
247    /// - User agent
248    /// - Api key
249    /// - Container Id/Entity Id
250    pub fn to_request_builder(&self, user_agent: &str) -> anyhow::Result<HttpRequestBuilder> {
251        let mut builder = hyper::Request::builder()
252            .uri(self.url.clone())
253            .header(hyper::header::USER_AGENT, user_agent);
254
255        // Add the Api key header if available
256        if let Some(api_key) = &self.api_key {
257            builder = builder.header(header::DATADOG_API_KEY, HeaderValue::from_str(api_key)?);
258        }
259
260        // Add the test session token if available
261        if let Some(token) = &self.test_token {
262            builder = builder.header(
263                header::X_DATADOG_TEST_SESSION_TOKEN,
264                HeaderValue::from_str(token)?,
265            );
266        }
267
268        // Add the Container Id header if available
269        if let Some(container_id) = entity_id::get_container_id() {
270            builder = builder.header(header::DATADOG_CONTAINER_ID, container_id);
271        }
272
273        // Add the Entity Id header if available
274        if let Some(entity_id) = entity_id::get_entity_id() {
275            builder = builder.header(header::DATADOG_ENTITY_ID, entity_id);
276        }
277
278        // Add the External Env header if available
279        if let Some(external_env) = *DD_EXTERNAL_ENV {
280            builder = builder.header(header::DATADOG_EXTERNAL_ENV, external_env);
281        }
282
283        Ok(builder)
284    }
285
286    #[inline]
287    pub fn from_slice(url: &str) -> Endpoint {
288        Endpoint {
289            #[allow(clippy::unwrap_used)]
290            url: parse_uri(url).unwrap(),
291            ..Default::default()
292        }
293    }
294
295    #[inline]
296    pub fn from_url(url: hyper::Uri) -> Endpoint {
297        Endpoint {
298            url,
299            ..Default::default()
300        }
301    }
302
303    pub fn is_file_endpoint(&self) -> bool {
304        self.url.scheme_str() == Some("file")
305    }
306
307    /// Set a custom timeout for this endpoint.
308    /// If not called, uses the default timeout of 3000ms.
309    ///
310    /// # Arguments
311    /// * `timeout_ms` - Timeout in milliseconds. Pass 0 to use the default timeout (3000ms).
312    ///
313    /// # Returns
314    /// Self with the timeout set, allowing for method chaining
315    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
316        self.timeout_ms = if timeout_ms == 0 {
317            Self::DEFAULT_TIMEOUT
318        } else {
319            timeout_ms
320        };
321        self
322    }
323}