Skip to main content

libdd_common/
lib.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3#![cfg_attr(not(test), deny(clippy::panic))]
4#![cfg_attr(not(test), deny(clippy::unwrap_used))]
5#![cfg_attr(not(test), deny(clippy::expect_used))]
6#![cfg_attr(not(test), deny(clippy::todo))]
7#![cfg_attr(not(test), deny(clippy::unimplemented))]
8
9use anyhow::Context;
10use hyper::http::uri;
11use serde::de::Error;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use std::sync::{Mutex, MutexGuard};
14use std::{borrow::Cow, ops::Deref, path::PathBuf, str::FromStr};
15
16pub mod azure_app_services;
17pub mod cc_utils;
18pub mod connector;
19#[cfg(feature = "reqwest")]
20pub mod dump_server;
21pub mod entity_id;
22#[macro_use]
23pub mod cstr;
24pub mod config;
25pub mod error;
26pub mod http_common;
27pub mod rate_limiter;
28pub mod tag;
29#[cfg(any(test, feature = "test-utils"))]
30pub mod test_utils;
31pub mod threading;
32pub mod timeout;
33pub mod unix_utils;
34pub mod worker;
35
36/// Extension trait for `Mutex` to provide a method that acquires a lock, panicking if the lock is
37/// poisoned.
38///
39/// This helper function is intended to be used to avoid having to add many
40/// `#[allow(clippy::unwrap_used)]` annotations if there are a lot of usages of `Mutex`.
41///
42/// # Arguments
43///
44/// * `self` - A reference to the `Mutex` to lock.
45///
46/// # Returns
47///
48/// A `MutexGuard` that provides access to the locked data.
49///
50/// # Panics
51///
52/// This function will panic if the `Mutex` is poisoned.
53///
54/// # Examples
55///
56/// ```
57/// use libdd_common::MutexExt;
58/// use std::sync::{Arc, Mutex};
59///
60/// let data = Arc::new(Mutex::new(5));
61/// let data_clone = Arc::clone(&data);
62///
63/// std::thread::spawn(move || {
64///     let mut num = data_clone.lock_or_panic();
65///     *num += 1;
66/// })
67/// .join()
68/// .expect("Thread panicked");
69///
70/// assert_eq!(*data.lock_or_panic(), 6);
71/// ```
72pub trait MutexExt<T> {
73    fn lock_or_panic(&self) -> MutexGuard<'_, T>;
74}
75
76impl<T> MutexExt<T> for Mutex<T> {
77    #[inline(always)]
78    #[track_caller]
79    fn lock_or_panic(&self) -> MutexGuard<'_, T> {
80        #[allow(clippy::unwrap_used)]
81        self.lock().unwrap()
82    }
83}
84
85pub mod header {
86    #![allow(clippy::declare_interior_mutable_const)]
87    use hyper::{header::HeaderName, http::HeaderValue};
88
89    // These strings are defined separately to be used in context where &str are used to represent
90    // headers (e.g. SendData) while keeping a single source of truth.
91    pub const DATADOG_SEND_REAL_HTTP_STATUS_STR: &str = "datadog-send-real-http-status";
92    pub const DATADOG_TRACE_COUNT_STR: &str = "x-datadog-trace-count";
93    pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack";
94    pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf";
95
96    pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id");
97    pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id");
98    pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env");
99    pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count");
100    /// Signal to the agent to send 429 responses when a payload is dropped
101    /// If this is not set then the agent will always return a 200 regardless if the payload is
102    /// dropped.
103    pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName =
104        HeaderName::from_static(DATADOG_SEND_REAL_HTTP_STATUS_STR);
105    pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key");
106    pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
107    pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR);
108    pub const APPLICATION_PROTOBUF: HeaderValue =
109        HeaderValue::from_static(APPLICATION_PROTOBUF_STR);
110    pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName =
111        HeaderName::from_static("x-datadog-test-session-token");
112}
113
114pub type HttpClient = http_common::GenericHttpClient<connector::Connector>;
115pub type GenericHttpClient<C> = http_common::GenericHttpClient<C>;
116pub type HttpResponse = http_common::HttpResponse;
117pub type HttpRequestBuilder = hyper::http::request::Builder;
118pub trait Connect:
119    hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
120{
121}
122impl<C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static> Connect
123    for C
124{
125}
126
127// Used by tag! macro
128pub use const_format;
129
130#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
131pub struct Endpoint {
132    #[serde(serialize_with = "serialize_uri", deserialize_with = "deserialize_uri")]
133    pub url: hyper::Uri,
134    pub api_key: Option<Cow<'static, str>>,
135    pub timeout_ms: u64,
136    /// Sets X-Datadog-Test-Session-Token header on any request
137    pub test_token: Option<Cow<'static, str>>,
138    /// Use the system DNS resolver when building the HTTP client. If false, the default
139    /// in-process resolver is used.
140    #[serde(default)]
141    pub use_system_resolver: bool,
142}
143
144impl Default for Endpoint {
145    fn default() -> Self {
146        Endpoint {
147            url: hyper::Uri::default(),
148            api_key: None,
149            timeout_ms: Self::DEFAULT_TIMEOUT,
150            test_token: None,
151            use_system_resolver: false,
152        }
153    }
154}
155
156#[derive(serde::Deserialize, serde::Serialize)]
157struct SerializedUri<'a> {
158    scheme: Option<Cow<'a, str>>,
159    authority: Option<Cow<'a, str>>,
160    path_and_query: Option<Cow<'a, str>>,
161}
162
163fn serialize_uri<S>(uri: &hyper::Uri, serializer: S) -> Result<S::Ok, S::Error>
164where
165    S: Serializer,
166{
167    let parts = uri.clone().into_parts();
168    let uri = SerializedUri {
169        scheme: parts.scheme.as_ref().map(|s| Cow::Borrowed(s.as_str())),
170        authority: parts.authority.as_ref().map(|s| Cow::Borrowed(s.as_str())),
171        path_and_query: parts
172            .path_and_query
173            .as_ref()
174            .map(|s| Cow::Borrowed(s.as_str())),
175    };
176    uri.serialize(serializer)
177}
178
179fn deserialize_uri<'de, D>(deserializer: D) -> Result<hyper::Uri, D::Error>
180where
181    D: Deserializer<'de>,
182{
183    let uri = SerializedUri::deserialize(deserializer)?;
184    let mut builder = hyper::Uri::builder();
185    if let Some(v) = uri.authority {
186        builder = builder.authority(v.deref());
187    }
188    if let Some(v) = uri.scheme {
189        builder = builder.scheme(v.deref());
190    }
191    if let Some(v) = uri.path_and_query {
192        builder = builder.path_and_query(v.deref());
193    }
194
195    builder.build().map_err(Error::custom)
196}
197
198/// TODO: we should properly handle malformed urls
199/// * For windows and unix schemes:
200///     * For compatibility reasons with existing implementation this parser stores the encoded path
201///       in authority section as there is no existing standard [see](https://github.com/whatwg/url/issues/577)
202///       that covers this. We need to pick one hack or another
203///     * For windows, interprets everything after windows: as path
204///     * For unix, interprets everything after unix:// as path
205/// * For file scheme implementation will simply backfill missing authority section
206pub fn parse_uri(uri: &str) -> anyhow::Result<hyper::Uri> {
207    if let Some(path) = uri.strip_prefix("unix://") {
208        encode_uri_path_in_authority("unix", path)
209    } else if let Some(path) = uri.strip_prefix("windows:") {
210        encode_uri_path_in_authority("windows", path)
211    } else if let Some(path) = uri.strip_prefix("file://") {
212        encode_uri_path_in_authority("file", path)
213    } else {
214        Ok(hyper::Uri::from_str(uri)?)
215    }
216}
217
218fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<hyper::Uri> {
219    let mut parts = uri::Parts::default();
220    parts.scheme = uri::Scheme::from_str(scheme).ok();
221
222    let path = hex::encode(path);
223
224    parts.authority = uri::Authority::from_str(path.as_str()).ok();
225    parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
226    Ok(hyper::Uri::from_parts(parts)?)
227}
228
229pub fn decode_uri_path_in_authority(uri: &hyper::Uri) -> anyhow::Result<PathBuf> {
230    let path = hex::decode(uri.authority().context("missing uri authority")?.as_str())?;
231    #[cfg(unix)]
232    {
233        use std::os::unix::ffi::OsStringExt;
234        Ok(PathBuf::from(std::ffi::OsString::from_vec(path)))
235    }
236    #[cfg(not(unix))]
237    {
238        match String::from_utf8(path) {
239            Ok(s) => Ok(PathBuf::from(s.as_str())),
240            _ => Err(anyhow::anyhow!("file uri should be utf-8")),
241        }
242    }
243}
244
245impl Endpoint {
246    /// Default value for the timeout field in milliseconds.
247    pub const DEFAULT_TIMEOUT: u64 = 3_000;
248
249    /// Returns an iterator of optional endpoint-specific headers (api-key, test-token)
250    /// as (header_name, header_value) string tuples for any that are available.
251    pub fn get_optional_headers(&self) -> impl Iterator<Item = (&'static str, &str)> {
252        [
253            self.api_key.as_ref().map(|v| ("dd-api-key", v.as_ref())),
254            self.test_token
255                .as_ref()
256                .map(|v| ("x-datadog-test-session-token", v.as_ref())),
257        ]
258        .into_iter()
259        .flatten()
260    }
261
262    /// Return a request builder with the following headers:
263    /// - User agent
264    /// - Api key
265    /// - Container Id/Entity Id
266    pub fn to_request_builder(&self, user_agent: &str) -> anyhow::Result<HttpRequestBuilder> {
267        let mut builder = hyper::Request::builder()
268            .uri(self.url.clone())
269            .header(hyper::header::USER_AGENT, user_agent);
270
271        // Add optional endpoint headers (api-key, test-token)
272        for (name, value) in self.get_optional_headers() {
273            builder = builder.header(name, value);
274        }
275
276        // Add entity-related headers (container-id, entity-id, external-env)
277        for (name, value) in entity_id::get_entity_headers() {
278            builder = builder.header(name, value);
279        }
280
281        Ok(builder)
282    }
283
284    #[inline]
285    pub fn from_slice(url: &str) -> Endpoint {
286        Endpoint {
287            #[allow(clippy::unwrap_used)]
288            url: parse_uri(url).unwrap(),
289            ..Default::default()
290        }
291    }
292
293    #[inline]
294    pub fn from_url(url: hyper::Uri) -> Endpoint {
295        Endpoint {
296            url,
297            ..Default::default()
298        }
299    }
300
301    pub fn is_file_endpoint(&self) -> bool {
302        self.url.scheme_str() == Some("file")
303    }
304
305    /// Set a custom timeout for this endpoint.
306    /// If not called, uses the default timeout of 3000ms.
307    ///
308    /// # Arguments
309    /// * `timeout_ms` - Timeout in milliseconds. Pass 0 to use the default timeout (3000ms).
310    ///
311    /// # Returns
312    /// Self with the timeout set, allowing for method chaining
313    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
314        self.timeout_ms = if timeout_ms == 0 {
315            Self::DEFAULT_TIMEOUT
316        } else {
317            timeout_ms
318        };
319        self
320    }
321
322    /// Use the system DNS resolver when building the reqwest client. Only has effect for
323    /// HTTP(S) endpoints.
324    pub fn with_system_resolver(mut self, use_system_resolver: bool) -> Self {
325        self.use_system_resolver = use_system_resolver;
326        self
327    }
328
329    /// Creates a reqwest ClientBuilder configured for this endpoint.
330    ///
331    /// This method handles various endpoint schemes:
332    /// - `http`/`https`: Standard HTTP(S) endpoints
333    /// - `unix`: Unix domain sockets (Unix only)
334    /// - `windows`: Windows named pipes (Windows only)
335    /// - `file`: File dump endpoints for debugging (spawns a local server to capture requests)
336    ///
337    /// The default in-process resolver is used for DNS (fork-safe). To use the system DNS resolver
338    /// instead (less fork-safe), set [`Endpoint::use_system_resolver`] to true via
339    /// [`Endpoint::with_system_resolver`].
340    ///
341    /// # Returns
342    /// A tuple of (ClientBuilder, request_url) where:
343    /// - ClientBuilder is configured with the appropriate transport and timeout
344    /// - request_url is the URL string to use for HTTP requests
345    ///
346    /// # Errors
347    /// Returns an error if:
348    /// - The endpoint scheme is unsupported
349    /// - Path decoding fails
350    /// - The dump server fails to start (for file:// scheme)
351    #[cfg(feature = "reqwest")]
352    pub fn to_reqwest_client_builder(&self) -> anyhow::Result<(reqwest::ClientBuilder, String)> {
353        use anyhow::Context;
354
355        let mut builder = reqwest::Client::builder()
356            .timeout(std::time::Duration::from_millis(self.timeout_ms))
357            .hickory_dns(!self.use_system_resolver);
358
359        let request_url = match self.url.scheme_str() {
360            // HTTP/HTTPS endpoints
361            Some("http") | Some("https") => self.url.to_string(),
362
363            // File dump endpoint (debugging) - uses platform-specific local transport
364            Some("file") => {
365                let output_path = decode_uri_path_in_authority(&self.url)
366                    .context("Failed to decode file path from URI")?;
367                let socket_or_pipe_path = dump_server::spawn_dump_server(output_path)?;
368
369                // Configure the client to use the local socket/pipe
370                #[cfg(unix)]
371                {
372                    builder = builder.unix_socket(socket_or_pipe_path);
373                }
374                #[cfg(windows)]
375                {
376                    builder = builder
377                        .windows_named_pipe(socket_or_pipe_path.to_string_lossy().to_string());
378                }
379
380                "http://localhost/".to_string()
381            }
382
383            // Unix domain sockets
384            #[cfg(unix)]
385            Some("unix") => {
386                use connector::uds::socket_path_from_uri;
387                let socket_path = socket_path_from_uri(&self.url)?;
388                builder = builder.unix_socket(socket_path);
389                format!("http://localhost{}", self.url.path())
390            }
391
392            // Windows named pipes
393            #[cfg(windows)]
394            Some("windows") => {
395                use connector::named_pipe::named_pipe_path_from_uri;
396                let pipe_path = named_pipe_path_from_uri(&self.url)?;
397                builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
398                format!("http://localhost{}", self.url.path())
399            }
400
401            // Unsupported schemes
402            scheme => anyhow::bail!("Unsupported endpoint scheme: {:?}", scheme),
403        };
404
405        Ok((builder, request_url))
406    }
407}