Skip to main content

libdd_common/
lib.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3#![cfg_attr(not(test), deny(clippy::panic))]
4#![cfg_attr(not(test), deny(clippy::unwrap_used))]
5#![cfg_attr(not(test), deny(clippy::expect_used))]
6#![cfg_attr(not(test), deny(clippy::todo))]
7#![cfg_attr(not(test), deny(clippy::unimplemented))]
8
9use anyhow::Context;
10use http::uri;
11use serde::de::Error;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use std::sync::{Mutex, MutexGuard};
14use std::{borrow::Cow, ops::Deref, path::PathBuf, str::FromStr};
15
16pub mod azure_app_services;
17#[cfg(not(target_arch = "wasm32"))]
18pub mod cc_utils;
19#[cfg(not(target_arch = "wasm32"))]
20pub mod connector;
21#[cfg(feature = "reqwest")]
22pub mod dump_server;
23pub mod entity_id;
24#[macro_use]
25pub mod cstr;
26#[cfg(feature = "bench-utils")]
27pub mod bench_utils;
28pub mod config;
29pub mod error;
30pub mod http_common;
31pub mod multipart;
32#[cfg(not(target_arch = "wasm32"))]
33pub mod rate_limiter;
34pub mod tag;
35#[cfg(any(test, feature = "test-utils"))]
36pub mod test_utils;
37#[cfg(not(target_arch = "wasm32"))]
38pub mod threading;
39#[cfg(not(target_arch = "wasm32"))]
40pub mod timeout;
41pub mod unix_utils;
42
43/// Extension trait for `Mutex` to provide a method that acquires a lock, panicking if the lock is
44/// poisoned.
45///
46/// This helper function is intended to be used to avoid having to add many
47/// `#[allow(clippy::unwrap_used)]` annotations if there are a lot of usages of `Mutex`.
48///
49/// # Arguments
50///
51/// * `self` - A reference to the `Mutex` to lock.
52///
53/// # Returns
54///
55/// A `MutexGuard` that provides access to the locked data.
56///
57/// # Panics
58///
59/// This function will panic if the `Mutex` is poisoned.
60///
61/// # Examples
62///
63/// ```
64/// use libdd_common::MutexExt;
65/// use std::sync::{Arc, Mutex};
66///
67/// let data = Arc::new(Mutex::new(5));
68/// let data_clone = Arc::clone(&data);
69///
70/// std::thread::spawn(move || {
71///     let mut num = data_clone.lock_or_panic();
72///     *num += 1;
73/// })
74/// .join()
75/// .expect("Thread panicked");
76///
77/// assert_eq!(*data.lock_or_panic(), 6);
78/// ```
79pub trait MutexExt<T> {
80    fn lock_or_panic(&self) -> MutexGuard<'_, T>;
81}
82
83impl<T> MutexExt<T> for Mutex<T> {
84    #[inline(always)]
85    #[track_caller]
86    fn lock_or_panic(&self) -> MutexGuard<'_, T> {
87        #[allow(clippy::unwrap_used)]
88        self.lock().unwrap()
89    }
90}
91
92pub mod header {
93    #![allow(clippy::declare_interior_mutable_const)]
94    use http::{header::HeaderName, HeaderValue};
95
96    pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack";
97    pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf";
98
99    pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id");
100    pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id");
101    pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env");
102    pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count");
103    /// Signal to the agent to send 429 responses when a payload is dropped
104    /// If this is not set then the agent will always return a 200 regardless if the payload is
105    /// dropped.
106    pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName =
107        HeaderName::from_static("datadog-send-real-http-status");
108    pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key");
109    pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
110    pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR);
111    pub const APPLICATION_PROTOBUF: HeaderValue =
112        HeaderValue::from_static(APPLICATION_PROTOBUF_STR);
113    pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName =
114        HeaderName::from_static("x-datadog-test-session-token");
115}
116
117#[cfg(not(target_arch = "wasm32"))]
118pub type HttpClient = http_common::GenericHttpClient<connector::Connector>;
119#[cfg(not(target_arch = "wasm32"))]
120pub type HttpResponse = http_common::HttpResponse;
121pub type HttpRequestBuilder = http::request::Builder;
122#[cfg(not(target_arch = "wasm32"))]
123pub trait Connect:
124    hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
125{
126}
127#[cfg(not(target_arch = "wasm32"))]
128impl<C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static> Connect
129    for C
130{
131}
132
133// Used by tag! macro
134pub use const_format;
135
136#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
137pub struct Endpoint {
138    #[serde(serialize_with = "serialize_uri", deserialize_with = "deserialize_uri")]
139    pub url: http::Uri,
140    pub api_key: Option<Cow<'static, str>>,
141    pub timeout_ms: u64,
142    /// Sets X-Datadog-Test-Session-Token header on any request
143    pub test_token: Option<Cow<'static, str>>,
144    /// Use the system DNS resolver when building the HTTP client. If false, the default
145    /// in-process resolver is used.
146    #[serde(default)]
147    pub use_system_resolver: bool,
148}
149
150impl Default for Endpoint {
151    fn default() -> Self {
152        Endpoint {
153            url: http::Uri::default(),
154            api_key: None,
155            timeout_ms: Self::DEFAULT_TIMEOUT,
156            test_token: None,
157            use_system_resolver: false,
158        }
159    }
160}
161
162#[derive(serde::Deserialize, serde::Serialize)]
163struct SerializedUri<'a> {
164    scheme: Option<Cow<'a, str>>,
165    authority: Option<Cow<'a, str>>,
166    path_and_query: Option<Cow<'a, str>>,
167}
168
169fn serialize_uri<S>(uri: &http::Uri, serializer: S) -> Result<S::Ok, S::Error>
170where
171    S: Serializer,
172{
173    let parts = uri.clone().into_parts();
174    let uri = SerializedUri {
175        scheme: parts.scheme.as_ref().map(|s| Cow::Borrowed(s.as_str())),
176        authority: parts.authority.as_ref().map(|s| Cow::Borrowed(s.as_str())),
177        path_and_query: parts
178            .path_and_query
179            .as_ref()
180            .map(|s| Cow::Borrowed(s.as_str())),
181    };
182    uri.serialize(serializer)
183}
184
185fn deserialize_uri<'de, D>(deserializer: D) -> Result<http::Uri, D::Error>
186where
187    D: Deserializer<'de>,
188{
189    let uri = SerializedUri::deserialize(deserializer)?;
190    let mut builder = http::Uri::builder();
191    if let Some(v) = uri.authority {
192        builder = builder.authority(v.deref());
193    }
194    if let Some(v) = uri.scheme {
195        builder = builder.scheme(v.deref());
196    }
197    if let Some(v) = uri.path_and_query {
198        builder = builder.path_and_query(v.deref());
199    }
200
201    builder.build().map_err(Error::custom)
202}
203
204/// TODO: we should properly handle malformed urls
205/// * For windows and unix schemes:
206///     * For compatibility reasons with existing implementation this parser stores the encoded path
207///       in authority section as there is no existing standard [see](https://github.com/whatwg/url/issues/577)
208///       that covers this. We need to pick one hack or another
209///     * For windows, interprets everything after windows: as path
210///     * For unix, interprets everything after unix:// as path
211/// * For file scheme implementation will simply backfill missing authority section
212pub fn parse_uri(uri: &str) -> anyhow::Result<http::Uri> {
213    if let Some(path) = uri.strip_prefix("unix://") {
214        encode_uri_path_in_authority("unix", path)
215    } else if let Some(path) = uri.strip_prefix("windows:") {
216        encode_uri_path_in_authority("windows", path)
217    } else if let Some(path) = uri.strip_prefix("file://") {
218        encode_uri_path_in_authority("file", path)
219    } else {
220        Ok(http::Uri::from_str(uri)?)
221    }
222}
223
224fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<http::Uri> {
225    let mut parts = uri::Parts::default();
226    parts.scheme = uri::Scheme::from_str(scheme).ok();
227
228    let path = hex::encode(path);
229
230    parts.authority = uri::Authority::from_str(path.as_str()).ok();
231    parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
232    Ok(http::Uri::from_parts(parts)?)
233}
234
235pub fn decode_uri_path_in_authority(uri: &http::Uri) -> anyhow::Result<PathBuf> {
236    let path = hex::decode(uri.authority().context("missing uri authority")?.as_str())?;
237    #[cfg(unix)]
238    {
239        use std::os::unix::ffi::OsStringExt;
240        Ok(PathBuf::from(std::ffi::OsString::from_vec(path)))
241    }
242    #[cfg(not(unix))]
243    {
244        match String::from_utf8(path) {
245            Ok(s) => Ok(PathBuf::from(s.as_str())),
246            _ => Err(anyhow::anyhow!("file uri should be utf-8")),
247        }
248    }
249}
250
251impl Endpoint {
252    /// Default value for the timeout field in milliseconds.
253    pub const DEFAULT_TIMEOUT: u64 = 3_000;
254
255    /// Returns an iterator of optional endpoint-specific headers (api-key, test-token)
256    /// as (header_name, header_value) string tuples for any that are available.
257    pub fn get_optional_headers(&self) -> impl Iterator<Item = (&'static str, &str)> {
258        [
259            self.api_key.as_ref().map(|v| ("dd-api-key", v.as_ref())),
260            self.test_token
261                .as_ref()
262                .map(|v| ("x-datadog-test-session-token", v.as_ref())),
263        ]
264        .into_iter()
265        .flatten()
266    }
267
268    /// Apply standard headers (user-agent, api-key, test-token, entity headers) to an
269    /// [`http::request::Builder`].
270    pub fn set_standard_headers(
271        &self,
272        mut builder: http::request::Builder,
273        user_agent: &str,
274    ) -> http::request::Builder {
275        builder = builder.header("user-agent", user_agent);
276        for (name, value) in self.get_optional_headers() {
277            builder = builder.header(name, value);
278        }
279        for (name, value) in entity_id::get_entity_headers() {
280            builder = builder.header(name, value);
281        }
282        builder
283    }
284
285    /// Return a request builder with the following headers:
286    /// - User agent
287    /// - Api key
288    /// - Container Id/Entity Id
289    pub fn to_request_builder(&self, user_agent: &str) -> anyhow::Result<HttpRequestBuilder> {
290        let mut builder = http::Request::builder()
291            .uri(self.url.clone())
292            .header(http::header::USER_AGENT, user_agent);
293
294        // Add optional endpoint headers (api-key, test-token)
295        for (name, value) in self.get_optional_headers() {
296            builder = builder.header(name, value);
297        }
298
299        // Add entity-related headers (container-id, entity-id, external-env)
300        for (name, value) in entity_id::get_entity_headers() {
301            builder = builder.header(name, value);
302        }
303
304        Ok(builder)
305    }
306
307    #[inline]
308    pub fn from_slice(url: &str) -> Endpoint {
309        Endpoint {
310            #[allow(clippy::unwrap_used)]
311            url: parse_uri(url).unwrap(),
312            ..Default::default()
313        }
314    }
315
316    #[inline]
317    pub fn from_url(url: http::Uri) -> Endpoint {
318        Endpoint {
319            url,
320            ..Default::default()
321        }
322    }
323
324    pub fn is_file_endpoint(&self) -> bool {
325        self.url.scheme_str() == Some("file")
326    }
327
328    /// Set a custom timeout for this endpoint.
329    /// If not called, uses the default timeout of 3000ms.
330    ///
331    /// # Arguments
332    /// * `timeout_ms` - Timeout in milliseconds. Pass 0 to use the default timeout (3000ms).
333    ///
334    /// # Returns
335    /// Self with the timeout set, allowing for method chaining
336    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
337        self.timeout_ms = if timeout_ms == 0 {
338            Self::DEFAULT_TIMEOUT
339        } else {
340            timeout_ms
341        };
342        self
343    }
344
345    /// Use the system DNS resolver when building the reqwest client. Only has effect for
346    /// HTTP(S) endpoints.
347    pub fn with_system_resolver(mut self, use_system_resolver: bool) -> Self {
348        self.use_system_resolver = use_system_resolver;
349        self
350    }
351
352    /// Creates a reqwest ClientBuilder configured for this endpoint.
353    ///
354    /// This method handles various endpoint schemes:
355    /// - `http`/`https`: Standard HTTP(S) endpoints
356    /// - `unix`: Unix domain sockets (Unix only)
357    /// - `windows`: Windows named pipes (Windows only)
358    /// - `file`: File dump endpoints for debugging (spawns a local server to capture requests)
359    ///
360    /// The default in-process resolver is used for DNS (fork-safe). To use the system DNS resolver
361    /// instead (less fork-safe), set [`Endpoint::use_system_resolver`] to true via
362    /// [`Endpoint::with_system_resolver`].
363    ///
364    /// # Returns
365    /// A tuple of (ClientBuilder, request_url) where:
366    /// - ClientBuilder is configured with the appropriate transport and timeout
367    /// - request_url is the URL string to use for HTTP requests
368    ///
369    /// # Errors
370    /// Returns an error if:
371    /// - The endpoint scheme is unsupported
372    /// - Path decoding fails
373    /// - The dump server fails to start (for file:// scheme)
374    #[cfg(feature = "reqwest")]
375    pub fn to_reqwest_client_builder(&self) -> anyhow::Result<(reqwest::ClientBuilder, String)> {
376        use anyhow::Context;
377
378        // Don't use proxies, as this calls `getenv` which is unsafe and not
379        // just in theory. It can cause crashes with PHP where php-fpm's env
380        // configuration will mutate the system environment (it doesn't pass
381        // it as part of the SAPI env, it changes the actual system env).
382        let mut builder = reqwest::Client::builder()
383            .timeout(std::time::Duration::from_millis(self.timeout_ms))
384            .hickory_dns(!self.use_system_resolver)
385            .no_proxy();
386
387        let request_url = match self.url.scheme_str() {
388            // HTTP/HTTPS endpoints
389            Some("http") | Some("https") => self.url.to_string(),
390
391            // File dump endpoint (debugging) - uses platform-specific local transport
392            Some("file") => {
393                let output_path = decode_uri_path_in_authority(&self.url)
394                    .context("Failed to decode file path from URI")?;
395                let socket_or_pipe_path = dump_server::spawn_dump_server(output_path)?;
396
397                // Configure the client to use the local socket/pipe
398                #[cfg(unix)]
399                {
400                    builder = builder.unix_socket(socket_or_pipe_path);
401                }
402                #[cfg(windows)]
403                {
404                    builder = builder
405                        .windows_named_pipe(socket_or_pipe_path.to_string_lossy().to_string());
406                }
407
408                "http://localhost/".to_string()
409            }
410
411            // Unix domain sockets
412            #[cfg(unix)]
413            Some("unix") => {
414                use connector::uds::socket_path_from_uri;
415                let socket_path = socket_path_from_uri(&self.url)?;
416                builder = builder.unix_socket(socket_path);
417                format!("http://localhost{}", self.url.path())
418            }
419
420            // Windows named pipes
421            #[cfg(windows)]
422            Some("windows") => {
423                use connector::named_pipe::named_pipe_path_from_uri;
424                let pipe_path = named_pipe_path_from_uri(&self.url)?;
425                builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
426                format!("http://localhost{}", self.url.path())
427            }
428
429            // Unsupported schemes
430            scheme => anyhow::bail!("Unsupported endpoint scheme: {:?}", scheme),
431        };
432
433        Ok((builder, request_url))
434    }
435}