libdd_telemetry/
config.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use http::{uri::PathAndQuery, Uri};
5use libdd_common::{config::parse_env, parse_uri, Endpoint};
6use std::{borrow::Cow, time::Duration};
7use tracing::debug;
8
9pub const DEFAULT_DD_SITE: &str = "datadoghq.com";
10pub const PROD_INTAKE_SUBDOMAIN: &str = "instrumentation-telemetry-intake";
11
12const DIRECT_TELEMETRY_URL_PATH: &str = "/api/v2/apmtelemetry";
13const AGENT_TELEMETRY_URL_PATH: &str = "/telemetry/proxy/api/v2/apmtelemetry";
14
15#[cfg(unix)]
16const TRACE_SOCKET_PATH: &str = "/var/run/datadog/apm.socket";
17
18const DEFAULT_AGENT_HOST: &str = "localhost";
19const DEFAULT_AGENT_PORT: u16 = 8126;
20
21#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
22pub struct Config {
23    /// Endpoint to send the data to
24    /// This is private and should be interacted with through the set_endpoint function
25    /// to ensure the url path is properly set
26    pub(crate) endpoint: Option<Endpoint>,
27    /// Enables debug logging
28    pub telemetry_debug_logging_enabled: bool,
29    pub telemetry_heartbeat_interval: Duration,
30    pub direct_submission_enabled: bool,
31    /// Prevents LifecycleAction::Stop from terminating the worker (except if the WorkerHandle is
32    /// dropped)
33    pub restartable: bool,
34
35    pub debug_enabled: bool,
36}
37
38fn endpoint_with_telemetry_path(
39    mut endpoint: Endpoint,
40    direct_submission_enabled: bool,
41) -> anyhow::Result<Endpoint> {
42    let mut uri_parts = endpoint.url.into_parts();
43    if uri_parts
44        .scheme
45        .as_ref()
46        .is_some_and(|scheme| scheme.as_str() != "file")
47    {
48        uri_parts.path_and_query = Some(PathAndQuery::from_static(
49            if endpoint.api_key.is_some() && direct_submission_enabled {
50                DIRECT_TELEMETRY_URL_PATH
51            } else {
52                AGENT_TELEMETRY_URL_PATH
53            },
54        ));
55    }
56
57    endpoint.url = Uri::from_parts(uri_parts)?;
58    Ok(endpoint)
59}
60
61/// Settings gathers configuration options we receive from the environment
62/// (either through env variable, or that could be set from the )
63#[derive(Debug)]
64pub struct Settings {
65    // Env parameter
66    pub agent_host: Option<String>,
67    pub trace_agent_port: Option<u16>,
68    pub trace_agent_url: Option<String>,
69    pub trace_pipe_name: Option<String>,
70    pub direct_submission_enabled: bool,
71    pub api_key: Option<String>,
72    pub site: Option<String>,
73    pub telemetry_dd_url: Option<String>,
74    pub telemetry_heartbeat_interval: Duration,
75    pub telemetry_extended_heartbeat_interval: Duration,
76    pub shared_lib_debug: bool,
77
78    // Filesystem check
79    pub agent_uds_socket_found: bool,
80}
81
82impl Default for Settings {
83    fn default() -> Self {
84        Self {
85            agent_host: None,
86            trace_agent_port: None,
87            trace_agent_url: None,
88            trace_pipe_name: None,
89            direct_submission_enabled: false,
90            api_key: None,
91            site: None,
92            telemetry_dd_url: None,
93            telemetry_heartbeat_interval: Duration::from_secs(60),
94            telemetry_extended_heartbeat_interval: Duration::from_secs(60 * 60 * 24),
95            shared_lib_debug: false,
96
97            agent_uds_socket_found: false,
98        }
99    }
100}
101
102impl Settings {
103    // Agent connection configuration
104    const DD_TRACE_AGENT_URL: &'static str = "DD_TRACE_AGENT_URL";
105    const DD_AGENT_HOST: &'static str = "DD_AGENT_HOST";
106    const DD_TRACE_AGENT_PORT: &'static str = "DD_TRACE_AGENT_PORT";
107    // Location of the named pipe on windows. Dotnet specific
108    const DD_TRACE_PIPE_NAME: &'static str = "DD_TRACE_PIPE_NAME";
109
110    // Direct submission configuration
111    const _DD_DIRECT_SUBMISSION_ENABLED: &'static str = "_DD_DIRECT_SUBMISSION_ENABLED";
112    const DD_API_KEY: &'static str = "DD_API_KEY";
113    const DD_SITE: &'static str = "DD_SITE";
114    const DD_APM_TELEMETRY_DD_URL: &'static str = "DD_APM_TELEMETRY_DD_URL";
115
116    // Development and test env variables - should not be used by customers
117    const DD_TELEMETRY_HEARTBEAT_INTERVAL: &'static str = "DD_TELEMETRY_HEARTBEAT_INTERVAL";
118    const DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL: &'static str =
119        "DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL";
120    const _DD_SHARED_LIB_DEBUG: &'static str = "_DD_SHARED_LIB_DEBUG";
121
122    pub fn from_env() -> Self {
123        debug!(
124            config.source = "environment",
125            "Loading telemetry settings from environment variables"
126        );
127        let default = Self::default();
128        Self {
129            agent_host: parse_env::str_not_empty(Self::DD_AGENT_HOST),
130            trace_agent_port: parse_env::int(Self::DD_TRACE_AGENT_PORT),
131            trace_agent_url: parse_env::str_not_empty(Self::DD_TRACE_AGENT_URL)
132                .or(default.trace_agent_url),
133            trace_pipe_name: parse_env::str_not_empty(Self::DD_TRACE_PIPE_NAME)
134                .or(default.trace_pipe_name),
135            direct_submission_enabled: parse_env::bool(Self::_DD_DIRECT_SUBMISSION_ENABLED)
136                .unwrap_or(default.direct_submission_enabled),
137            api_key: parse_env::str_not_empty(Self::DD_API_KEY),
138            site: parse_env::str_not_empty(Self::DD_SITE),
139            telemetry_dd_url: parse_env::str_not_empty(Self::DD_APM_TELEMETRY_DD_URL),
140            telemetry_heartbeat_interval: parse_env::duration(
141                Self::DD_TELEMETRY_HEARTBEAT_INTERVAL,
142            )
143            .unwrap_or(Duration::from_secs(60)),
144            telemetry_extended_heartbeat_interval: parse_env::duration(
145                Self::DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL,
146            )
147            .unwrap_or(Duration::from_secs(60 * 60 * 24)),
148            shared_lib_debug: parse_env::bool(Self::_DD_SHARED_LIB_DEBUG).unwrap_or(false),
149
150            agent_uds_socket_found: (|| {
151                #[cfg(unix)]
152                return std::fs::metadata(TRACE_SOCKET_PATH).is_ok();
153                #[cfg(not(unix))]
154                return false;
155            })(),
156        }
157    }
158}
159
160impl Default for Config {
161    fn default() -> Self {
162        Self {
163            endpoint: None,
164            telemetry_debug_logging_enabled: false,
165            telemetry_heartbeat_interval: Duration::from_secs(60),
166            direct_submission_enabled: false,
167            restartable: false,
168            debug_enabled: false,
169        }
170    }
171}
172
173impl Config {
174    // Implemented following
175    // https://github.com/DataDog/architecture/blob/master/rfcs/apm/integrations/trace-autodetect-agent-config/rfc.md
176    fn trace_agent_url_from_setting(settings: &Settings) -> String {
177        None.or_else(|| {
178            settings
179                .trace_agent_url
180                .as_deref()
181                .filter(|u| {
182                    u.starts_with("unix://")
183                        || u.starts_with("http://")
184                        || u.starts_with("https://")
185                })
186                .map(ToString::to_string)
187        })
188        .or_else(|| {
189            #[cfg(windows)]
190            return settings
191                .trace_pipe_name
192                .as_ref()
193                .map(|pipe_name| format!("windows:{pipe_name}"));
194            #[cfg(not(windows))]
195            return None;
196        })
197        .or_else(|| match (&settings.agent_host, settings.trace_agent_port) {
198            (None, None) => None,
199            _ => Some(format!(
200                "http://{}:{}",
201                settings.agent_host.as_deref().unwrap_or(DEFAULT_AGENT_HOST),
202                settings.trace_agent_port.unwrap_or(DEFAULT_AGENT_PORT),
203            )),
204        })
205        .or_else(|| {
206            #[cfg(unix)]
207            return settings
208                .agent_uds_socket_found
209                .then(|| format!("unix://{TRACE_SOCKET_PATH}"));
210            #[cfg(not(unix))]
211            return None;
212        })
213        .unwrap_or_else(|| format!("http://{DEFAULT_AGENT_HOST}:{DEFAULT_AGENT_PORT}"))
214    }
215
216    fn api_key_from_settings(settings: &Settings) -> Option<Cow<'static, str>> {
217        if !settings.direct_submission_enabled {
218            return None;
219        }
220        settings.api_key.clone().map(Cow::Owned)
221    }
222
223    pub fn endpoint(&self) -> Option<&Endpoint> {
224        self.endpoint.as_ref()
225    }
226
227    pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> {
228        self.endpoint = Some(endpoint_with_telemetry_path(
229            endpoint,
230            self.direct_submission_enabled,
231        )?);
232        Ok(())
233    }
234
235    pub fn set_endpoint_test_token<T: Into<Cow<'static, str>>>(&mut self, test_token: Option<T>) {
236        if let Some(endpoint) = &mut self.endpoint {
237            endpoint.test_token = test_token.map(|t| t.into());
238        }
239    }
240
241    pub fn from_settings(settings: &Settings) -> Self {
242        let trace_agent_url = Self::trace_agent_url_from_setting(settings);
243        let api_key = Self::api_key_from_settings(settings);
244
245        let mut this = Self {
246            endpoint: None,
247            telemetry_debug_logging_enabled: settings.shared_lib_debug,
248            telemetry_heartbeat_interval: settings.telemetry_heartbeat_interval,
249            direct_submission_enabled: settings.direct_submission_enabled,
250            restartable: false,
251            debug_enabled: false,
252        };
253        if let Ok(url) = parse_uri(&trace_agent_url) {
254            let _res = this.set_endpoint(Endpoint {
255                url,
256                api_key,
257                ..Default::default()
258            });
259        }
260
261        this
262    }
263
264    /// Get the configuration of the telemetry worker from env variables
265    pub fn from_env() -> Self {
266        let settings = Settings::from_env();
267        Self::from_settings(&settings)
268    }
269
270    /// set_host sets the host telemetry should connect to.
271    ///
272    /// It handles the following schemes
273    /// * http/https
274    /// * unix sockets unix://\<path to the socket>
275    /// * windows pipes of the format windows:\<pipe name>
276    /// * files, with the format file://\<path to the file>
277    ///
278    ///  If the host_url is http/https, any path will be ignored and replaced by the
279    /// appropriate telemetry endpoint path
280    pub fn set_host_from_url(&mut self, host_url: &str) -> anyhow::Result<()> {
281        let endpoint = self.endpoint.take().unwrap_or_default();
282
283        self.set_endpoint(Endpoint {
284            url: parse_uri(host_url)?,
285            ..endpoint
286        })
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use std::path::Path;
293
294    #[cfg(unix)]
295    use libdd_common::connector::uds;
296
297    use libdd_common::connector::named_pipe;
298
299    use super::{Config, Settings};
300
301    #[test]
302    fn test_agent_host_detection_trace_agent_url_should_take_precedence() {
303        let cases = [
304            (
305                "http://localhost:1234",
306                "http://localhost:1234/telemetry/proxy/api/v2/apmtelemetry",
307            ),
308            (
309                "unix://./here",
310                "unix://2e2f68657265/telemetry/proxy/api/v2/apmtelemetry",
311            ),
312        ];
313        for (trace_agent_url, expected) in cases {
314            let settings = Settings {
315                trace_agent_url: Some(trace_agent_url.to_owned()),
316                agent_host: Some("example.org".to_owned()),
317                trace_agent_port: Some(1),
318                trace_pipe_name: Some("C:\\foo".to_owned()),
319                agent_uds_socket_found: true,
320                ..Default::default()
321            };
322            let cfg = Config::from_settings(&settings);
323            assert_eq!(cfg.endpoint.unwrap().url.to_string(), expected);
324        }
325    }
326
327    #[test]
328    fn test_agent_host_detection_agent_host_and_port() {
329        let cases = [
330            (
331                Some("example.org"),
332                Some(1),
333                "http://example.org:1/telemetry/proxy/api/v2/apmtelemetry",
334            ),
335            (
336                Some("example.org"),
337                None,
338                "http://example.org:8126/telemetry/proxy/api/v2/apmtelemetry",
339            ),
340            (
341                None,
342                Some(1),
343                "http://localhost:1/telemetry/proxy/api/v2/apmtelemetry",
344            ),
345        ];
346        for (agent_host, trace_agent_port, expected) in cases {
347            let settings = Settings {
348                trace_agent_url: None,
349                agent_host: agent_host.map(ToString::to_string),
350                trace_agent_port,
351                trace_pipe_name: None,
352                agent_uds_socket_found: true,
353                ..Default::default()
354            };
355            let cfg = Config::from_settings(&settings);
356            assert_eq!(cfg.endpoint.unwrap().url.to_string(), expected);
357        }
358    }
359
360    #[test]
361    #[cfg(unix)]
362    fn test_agent_host_detection_socket_found() {
363        let settings = Settings {
364            trace_agent_url: None,
365            agent_host: None,
366            trace_agent_port: None,
367            trace_pipe_name: None,
368            agent_uds_socket_found: true,
369            ..Default::default()
370        };
371        let cfg = Config::from_settings(&settings);
372        assert_eq!(
373            cfg.endpoint.unwrap().url.to_string(),
374            "unix://2f7661722f72756e2f64617461646f672f61706d2e736f636b6574/telemetry/proxy/api/v2/apmtelemetry"
375        );
376    }
377
378    #[test]
379    fn test_agent_host_detection_fallback() {
380        let settings = Settings {
381            trace_agent_url: None,
382            agent_host: None,
383            trace_agent_port: None,
384            trace_pipe_name: None,
385            agent_uds_socket_found: false,
386            ..Default::default()
387        };
388
389        let cfg = Config::from_settings(&settings);
390        assert_eq!(
391            cfg.endpoint.unwrap().url.to_string(),
392            "http://localhost:8126/telemetry/proxy/api/v2/apmtelemetry"
393        );
394    }
395
396    #[test]
397    fn test_config_set_url() {
398        let mut cfg = Config::default();
399
400        cfg.set_host_from_url("http://example.com/any_path_will_be_ignored")
401            .unwrap();
402
403        assert_eq!(
404            "http://example.com/telemetry/proxy/api/v2/apmtelemetry",
405            cfg.clone().endpoint.unwrap().url
406        );
407    }
408
409    #[test]
410    fn test_config_set_url_file() {
411        let cases = [
412            ("file:///absolute/path", "/absolute/path"),
413            ("file://./relative/path", "./relative/path"),
414            ("file://relative/path", "relative/path"),
415            (
416                "file://c://temp//with space\\foo.json",
417                "c://temp//with space\\foo.json",
418            ),
419        ];
420
421        for (input, expected) in cases {
422            let mut cfg = Config::default();
423            cfg.set_host_from_url(input).unwrap();
424
425            assert_eq!(
426                "file",
427                cfg.clone()
428                    .endpoint
429                    .unwrap()
430                    .url
431                    .scheme()
432                    .unwrap()
433                    .to_string()
434            );
435            assert_eq!(
436                Path::new(expected),
437                libdd_common::decode_uri_path_in_authority(&cfg.endpoint.unwrap().url).unwrap(),
438            );
439        }
440    }
441
442    #[test]
443    #[cfg(unix)]
444    fn test_config_set_url_unix_socket() {
445        let mut cfg = Config::default();
446
447        cfg.set_host_from_url("unix:///compatiliby/path").unwrap();
448        assert_eq!(
449            "unix://2f636f6d706174696c6962792f70617468/telemetry/proxy/api/v2/apmtelemetry",
450            cfg.clone().endpoint.unwrap().url.to_string()
451        );
452        assert_eq!(
453            "/compatiliby/path",
454            uds::socket_path_from_uri(&cfg.clone().endpoint.unwrap().url)
455                .unwrap()
456                .to_string_lossy()
457        );
458    }
459
460    #[test]
461    fn test_config_set_url_windows_pipe() {
462        let mut cfg = Config::default();
463
464        cfg.set_host_from_url("windows:C:\\system32\\foo").unwrap();
465        assert_eq!(
466            "windows://433a5c73797374656d33325c666f6f/telemetry/proxy/api/v2/apmtelemetry",
467            cfg.clone().endpoint.unwrap().url.to_string()
468        );
469        assert_eq!(
470            "C:\\system32\\foo",
471            named_pipe::named_pipe_path_from_uri(&cfg.clone().endpoint.unwrap().url)
472                .unwrap()
473                .to_string_lossy()
474        );
475    }
476}