Skip to main content

libdd_common/
lib.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3#![cfg_attr(not(test), deny(clippy::panic))]
4#![cfg_attr(not(test), deny(clippy::unwrap_used))]
5#![cfg_attr(not(test), deny(clippy::expect_used))]
6#![cfg_attr(not(test), deny(clippy::todo))]
7#![cfg_attr(not(test), deny(clippy::unimplemented))]
8
9use anyhow::Context;
10use http::uri;
11use serde::de::Error;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
14use std::{borrow::Cow, ops::Deref, path::PathBuf, str::FromStr};
15
16pub mod azure_app_services;
17#[cfg(not(target_arch = "wasm32"))]
18pub mod cc_utils;
19#[cfg(not(target_arch = "wasm32"))]
20pub mod connector;
21#[cfg(feature = "reqwest")]
22pub mod dump_server;
23pub mod entity_id;
24pub mod regex_engine;
25#[macro_use]
26pub mod cstr;
27#[cfg(feature = "bench-utils")]
28pub mod bench_utils;
29pub mod config;
30pub mod error;
31pub mod http_common;
32pub mod multipart;
33#[cfg(not(target_arch = "wasm32"))]
34pub mod rate_limiter;
35pub mod tag;
36#[cfg(any(test, feature = "test-utils"))]
37pub mod test_utils;
38#[cfg(not(target_arch = "wasm32"))]
39pub mod threading;
40#[cfg(not(target_arch = "wasm32"))]
41pub mod timeout;
42pub mod unix_utils;
43
44/// Extension trait for `Mutex` to provide a method that acquires a lock, panicking if the lock is
45/// poisoned.
46///
47/// This helper function is intended to be used to avoid having to add many
48/// `#[allow(clippy::unwrap_used)]` annotations if there are a lot of usages of `Mutex`.
49///
50/// # Arguments
51///
52/// * `self` - A reference to the `Mutex` to lock.
53///
54/// # Returns
55///
56/// A `MutexGuard` that provides access to the locked data.
57///
58/// # Panics
59///
60/// This function will panic if the `Mutex` is poisoned.
61///
62/// # Examples
63///
64/// ```
65/// use libdd_common::MutexExt;
66/// use std::sync::{Arc, Mutex};
67///
68/// let data = Arc::new(Mutex::new(5));
69/// let data_clone = Arc::clone(&data);
70///
71/// std::thread::spawn(move || {
72///     let mut num = data_clone.lock_or_panic();
73///     *num += 1;
74/// })
75/// .join()
76/// .expect("Thread panicked");
77///
78/// assert_eq!(*data.lock_or_panic(), 6);
79/// ```
80pub trait MutexExt<T> {
81    fn lock_or_panic(&self) -> MutexGuard<'_, T>;
82}
83
84impl<T> MutexExt<T> for Mutex<T> {
85    #[inline(always)]
86    #[track_caller]
87    fn lock_or_panic(&self) -> MutexGuard<'_, T> {
88        #[allow(clippy::unwrap_used)]
89        self.lock().unwrap()
90    }
91}
92
93/// Extension trait for `RwLock` to provide methods that acquire read/write locks, panicking if
94/// the lock is poisoned.
95///
96/// Mirrors [`MutexExt`] for `RwLock` so callers avoid `#[allow(clippy::unwrap_used)]` at each
97/// lock site.
98///
99/// # Examples
100///
101/// ```
102/// use libdd_common::RwLockExt;
103/// use std::sync::{Arc, RwLock};
104///
105/// let data = Arc::new(RwLock::new(5));
106/// let data_clone = Arc::clone(&data);
107///
108/// std::thread::spawn(move || {
109///     let mut num = data_clone.write_or_panic();
110///     *num += 1;
111/// })
112/// .join()
113/// .expect("Thread panicked");
114///
115/// assert_eq!(*data.read_or_panic(), 6);
116/// ```
117pub trait RwLockExt<T> {
118    fn read_or_panic(&self) -> RwLockReadGuard<'_, T>;
119    fn write_or_panic(&self) -> RwLockWriteGuard<'_, T>;
120}
121
122impl<T> RwLockExt<T> for RwLock<T> {
123    #[inline(always)]
124    #[track_caller]
125    fn read_or_panic(&self) -> RwLockReadGuard<'_, T> {
126        #[allow(clippy::unwrap_used)]
127        self.read().unwrap()
128    }
129
130    #[inline(always)]
131    #[track_caller]
132    fn write_or_panic(&self) -> RwLockWriteGuard<'_, T> {
133        #[allow(clippy::unwrap_used)]
134        self.write().unwrap()
135    }
136}
137
138/// Extension trait that extracts the value from a `Result` whose error type is uninhabited.
139///
140/// The signature constrains callers at compile time: the method is only available when the
141/// error type is [`core::convert::Infallible`]. No panics — the compiler proves the `Err`
142/// arm unreachable from the type.
143///
144/// # Examples
145///
146/// ```
147/// use libdd_common::ResultInfallibleExt;
148/// use std::convert::Infallible;
149///
150/// let result: Result<i32, Infallible> = Ok(42);
151/// assert_eq!(result.unwrap_infallible(), 42);
152/// ```
153pub trait ResultInfallibleExt<T>: sealed::Sealed {
154    fn unwrap_infallible(self) -> T;
155}
156
157impl<T> ResultInfallibleExt<T> for Result<T, core::convert::Infallible> {
158    #[inline(always)]
159    fn unwrap_infallible(self) -> T {
160        match self {
161            Ok(value) => value,
162            Err(never) => match never {},
163        }
164    }
165}
166
167mod sealed {
168    pub trait Sealed {}
169    impl<T> Sealed for Result<T, core::convert::Infallible> {}
170}
171
172pub mod header {
173    #![allow(clippy::declare_interior_mutable_const)]
174    use http::{header::HeaderName, HeaderValue};
175
176    pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack";
177    pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf";
178
179    pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id");
180    pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id");
181    pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env");
182    pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count");
183    /// Signal to the agent to send 429 responses when a payload is dropped
184    /// If this is not set then the agent will always return a 200 regardless if the payload is
185    /// dropped.
186    pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName =
187        HeaderName::from_static("datadog-send-real-http-status");
188    pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key");
189    pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
190    pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR);
191    pub const APPLICATION_PROTOBUF: HeaderValue =
192        HeaderValue::from_static(APPLICATION_PROTOBUF_STR);
193    pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName =
194        HeaderName::from_static("x-datadog-test-session-token");
195}
196
197#[cfg(not(target_arch = "wasm32"))]
198pub type HttpClient = http_common::GenericHttpClient<connector::Connector>;
199#[cfg(not(target_arch = "wasm32"))]
200pub type HttpResponse = http_common::HttpResponse;
201pub type HttpRequestBuilder = http::request::Builder;
202#[cfg(not(target_arch = "wasm32"))]
203pub trait Connect:
204    hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
205{
206}
207#[cfg(not(target_arch = "wasm32"))]
208impl<C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static> Connect
209    for C
210{
211}
212
213// Used by tag! macro
214pub use const_format;
215
216#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
217pub struct Endpoint {
218    #[serde(serialize_with = "serialize_uri", deserialize_with = "deserialize_uri")]
219    pub url: http::Uri,
220    pub api_key: Option<Cow<'static, str>>,
221    pub timeout_ms: u64,
222    /// Sets X-Datadog-Test-Session-Token header on any request
223    pub test_token: Option<Cow<'static, str>>,
224    /// Use the system DNS resolver when building the HTTP client. If false, the default
225    /// in-process resolver is used.
226    #[serde(default)]
227    pub use_system_resolver: bool,
228}
229
230impl Default for Endpoint {
231    fn default() -> Self {
232        Endpoint {
233            url: http::Uri::default(),
234            api_key: None,
235            timeout_ms: Self::DEFAULT_TIMEOUT,
236            test_token: None,
237            use_system_resolver: false,
238        }
239    }
240}
241
242#[derive(serde::Deserialize, serde::Serialize)]
243struct SerializedUri<'a> {
244    scheme: Option<Cow<'a, str>>,
245    authority: Option<Cow<'a, str>>,
246    path_and_query: Option<Cow<'a, str>>,
247}
248
249fn serialize_uri<S>(uri: &http::Uri, serializer: S) -> Result<S::Ok, S::Error>
250where
251    S: Serializer,
252{
253    let parts = uri.clone().into_parts();
254    let uri = SerializedUri {
255        scheme: parts.scheme.as_ref().map(|s| Cow::Borrowed(s.as_str())),
256        authority: parts.authority.as_ref().map(|s| Cow::Borrowed(s.as_str())),
257        path_and_query: parts
258            .path_and_query
259            .as_ref()
260            .map(|s| Cow::Borrowed(s.as_str())),
261    };
262    uri.serialize(serializer)
263}
264
265fn deserialize_uri<'de, D>(deserializer: D) -> Result<http::Uri, D::Error>
266where
267    D: Deserializer<'de>,
268{
269    let uri = SerializedUri::deserialize(deserializer)?;
270    let mut builder = http::Uri::builder();
271    if let Some(v) = uri.authority {
272        builder = builder.authority(v.deref());
273    }
274    if let Some(v) = uri.scheme {
275        builder = builder.scheme(v.deref());
276    }
277    if let Some(v) = uri.path_and_query {
278        builder = builder.path_and_query(v.deref());
279    }
280
281    builder.build().map_err(Error::custom)
282}
283
284/// TODO: we should properly handle malformed urls
285/// * For windows and unix schemes:
286///     * For compatibility reasons with existing implementation this parser stores the encoded path
287///       in authority section as there is no existing standard [see](https://github.com/whatwg/url/issues/577)
288///       that covers this. We need to pick one hack or another
289///     * For windows, interprets everything after windows: as path
290///     * For unix, interprets everything after unix:// as path
291/// * For file scheme implementation will simply backfill missing authority section
292pub fn parse_uri(uri: &str) -> anyhow::Result<http::Uri> {
293    if let Some(path) = uri.strip_prefix("unix://") {
294        encode_uri_path_in_authority("unix", path)
295    } else if let Some(path) = uri.strip_prefix("windows:") {
296        encode_uri_path_in_authority("windows", path)
297    } else if let Some(path) = uri.strip_prefix("file://") {
298        encode_uri_path_in_authority("file", path)
299    } else {
300        Ok(http::Uri::from_str(uri)?)
301    }
302}
303
304fn encode_uri_path_in_authority(scheme: &str, path: &str) -> anyhow::Result<http::Uri> {
305    let mut parts = uri::Parts::default();
306    parts.scheme = uri::Scheme::from_str(scheme).ok();
307
308    let path = hex::encode(path);
309
310    parts.authority = uri::Authority::from_str(path.as_str()).ok();
311    parts.path_and_query = Some(uri::PathAndQuery::from_static(""));
312    Ok(http::Uri::from_parts(parts)?)
313}
314
315pub fn decode_uri_path_in_authority(uri: &http::Uri) -> anyhow::Result<PathBuf> {
316    let path = hex::decode(uri.authority().context("missing uri authority")?.as_str())?;
317    #[cfg(unix)]
318    {
319        use std::os::unix::ffi::OsStringExt;
320        Ok(PathBuf::from(std::ffi::OsString::from_vec(path)))
321    }
322    #[cfg(not(unix))]
323    {
324        match String::from_utf8(path) {
325            Ok(s) => Ok(PathBuf::from(s.as_str())),
326            _ => Err(anyhow::anyhow!("file uri should be utf-8")),
327        }
328    }
329}
330
331impl Endpoint {
332    /// Default value for the timeout field in milliseconds.
333    pub const DEFAULT_TIMEOUT: u64 = 3_000;
334
335    /// Returns an iterator of optional endpoint-specific headers (api-key, test-token)
336    /// as (header_name, header_value) string tuples for any that are available.
337    pub fn get_optional_headers(&self) -> impl Iterator<Item = (&'static str, &str)> {
338        [
339            self.api_key.as_ref().map(|v| ("dd-api-key", v.as_ref())),
340            self.test_token
341                .as_ref()
342                .map(|v| ("x-datadog-test-session-token", v.as_ref())),
343        ]
344        .into_iter()
345        .flatten()
346    }
347
348    /// Apply standard headers (user-agent, api-key, test-token, entity headers) to an
349    /// [`http::request::Builder`].
350    pub fn set_standard_headers(
351        &self,
352        mut builder: http::request::Builder,
353        user_agent: &str,
354    ) -> http::request::Builder {
355        builder = builder.header("user-agent", user_agent);
356        for (name, value) in self.get_optional_headers() {
357            builder = builder.header(name, value);
358        }
359        for (name, value) in entity_id::get_entity_headers() {
360            builder = builder.header(name, value);
361        }
362        builder
363    }
364
365    /// Return a request builder with the following headers:
366    /// - User agent
367    /// - Api key
368    /// - Container Id/Entity Id
369    pub fn to_request_builder(&self, user_agent: &str) -> anyhow::Result<HttpRequestBuilder> {
370        let mut builder = http::Request::builder()
371            .uri(self.url.clone())
372            .header(http::header::USER_AGENT, user_agent);
373
374        // Add optional endpoint headers (api-key, test-token)
375        for (name, value) in self.get_optional_headers() {
376            builder = builder.header(name, value);
377        }
378
379        // Add entity-related headers (container-id, entity-id, external-env)
380        for (name, value) in entity_id::get_entity_headers() {
381            builder = builder.header(name, value);
382        }
383
384        Ok(builder)
385    }
386
387    #[inline]
388    pub fn from_slice(url: &str) -> Endpoint {
389        Endpoint {
390            #[allow(clippy::unwrap_used)]
391            url: parse_uri(url).unwrap(),
392            ..Default::default()
393        }
394    }
395
396    #[inline]
397    pub fn from_url(url: http::Uri) -> Endpoint {
398        Endpoint {
399            url,
400            ..Default::default()
401        }
402    }
403
404    pub fn is_file_endpoint(&self) -> bool {
405        self.url.scheme_str() == Some("file")
406    }
407
408    /// Set a custom timeout for this endpoint.
409    /// If not called, uses the default timeout of 3000ms.
410    ///
411    /// # Arguments
412    /// * `timeout_ms` - Timeout in milliseconds. Pass 0 to use the default timeout (3000ms).
413    ///
414    /// # Returns
415    /// Self with the timeout set, allowing for method chaining
416    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
417        self.timeout_ms = if timeout_ms == 0 {
418            Self::DEFAULT_TIMEOUT
419        } else {
420            timeout_ms
421        };
422        self
423    }
424
425    /// Use the system DNS resolver when building the reqwest client. Only has effect for
426    /// HTTP(S) endpoints.
427    pub fn with_system_resolver(mut self, use_system_resolver: bool) -> Self {
428        self.use_system_resolver = use_system_resolver;
429        self
430    }
431
432    /// Creates a reqwest ClientBuilder configured for this endpoint.
433    ///
434    /// This method handles various endpoint schemes:
435    /// - `http`/`https`: Standard HTTP(S) endpoints
436    /// - `unix`: Unix domain sockets (Unix only)
437    /// - `windows`: Windows named pipes (Windows only)
438    /// - `file`: File dump endpoints for debugging (spawns a local server to capture requests)
439    ///
440    /// The default in-process resolver is used for DNS (fork-safe). To use the system DNS resolver
441    /// instead (less fork-safe), set [`Endpoint::use_system_resolver`] to true via
442    /// [`Endpoint::with_system_resolver`].
443    ///
444    /// # Returns
445    /// A tuple of (ClientBuilder, request_url) where:
446    /// - ClientBuilder is configured with the appropriate transport and timeout
447    /// - request_url is the URL string to use for HTTP requests
448    ///
449    /// # Errors
450    /// Returns an error if:
451    /// - The endpoint scheme is unsupported
452    /// - Path decoding fails
453    /// - The dump server fails to start (for file:// scheme)
454    #[cfg(feature = "reqwest")]
455    pub fn to_reqwest_client_builder(&self) -> anyhow::Result<(reqwest::ClientBuilder, String)> {
456        use anyhow::Context;
457
458        // Don't use proxies, as this calls `getenv` which is unsafe and not
459        // just in theory. It can cause crashes with PHP where php-fpm's env
460        // configuration will mutate the system environment (it doesn't pass
461        // it as part of the SAPI env, it changes the actual system env).
462        let mut builder = reqwest::Client::builder()
463            .timeout(std::time::Duration::from_millis(self.timeout_ms))
464            .hickory_dns(!self.use_system_resolver)
465            .no_proxy();
466
467        let request_url = match self.url.scheme_str() {
468            // HTTP/HTTPS endpoints
469            Some("http") | Some("https") => self.url.to_string(),
470
471            // File dump endpoint (debugging) - uses platform-specific local transport
472            Some("file") => {
473                let output_path = decode_uri_path_in_authority(&self.url)
474                    .context("Failed to decode file path from URI")?;
475                let socket_or_pipe_path = dump_server::spawn_dump_server(output_path)?;
476
477                // Configure the client to use the local socket/pipe
478                #[cfg(unix)]
479                {
480                    builder = builder.unix_socket(socket_or_pipe_path);
481                }
482                #[cfg(windows)]
483                {
484                    builder = builder
485                        .windows_named_pipe(socket_or_pipe_path.to_string_lossy().to_string());
486                }
487
488                "http://localhost/".to_string()
489            }
490
491            // Unix domain sockets
492            #[cfg(unix)]
493            Some("unix") => {
494                use connector::uds::socket_path_from_uri;
495                let socket_path = socket_path_from_uri(&self.url)?;
496                builder = builder.unix_socket(socket_path);
497                format!("http://localhost{}", self.url.path())
498            }
499
500            // Windows named pipes
501            #[cfg(windows)]
502            Some("windows") => {
503                use connector::named_pipe::named_pipe_path_from_uri;
504                let pipe_path = named_pipe_path_from_uri(&self.url)?;
505                builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
506                format!("http://localhost{}", self.url.path())
507            }
508
509            // Unsupported schemes
510            scheme => anyhow::bail!("Unsupported endpoint scheme: {:?}", scheme),
511        };
512
513        Ok((builder, request_url))
514    }
515}