1use http::{uri::PathAndQuery, Uri};
5use libdd_common::{config::parse_env, parse_uri, Endpoint};
6use std::{borrow::Cow, time::Duration};
7use tracing::debug;
8
9pub const DEFAULT_DD_SITE: &str = "datadoghq.com";
10pub const PROD_INTAKE_SUBDOMAIN: &str = "instrumentation-telemetry-intake";
11
12const DIRECT_TELEMETRY_URL_PATH: &str = "/api/v2/apmtelemetry";
13const AGENT_TELEMETRY_URL_PATH: &str = "/telemetry/proxy/api/v2/apmtelemetry";
14
15#[cfg(unix)]
16const TRACE_SOCKET_PATH: &str = "/var/run/datadog/apm.socket";
17
18const DEFAULT_AGENT_HOST: &str = "localhost";
19const DEFAULT_AGENT_PORT: u16 = 8126;
20
21#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
22pub struct Config {
23 pub(crate) endpoint: Option<Endpoint>,
27 pub telemetry_debug_logging_enabled: bool,
29 pub telemetry_heartbeat_interval: Duration,
30 pub direct_submission_enabled: bool,
31 pub restartable: bool,
34
35 pub debug_enabled: bool,
36}
37
38fn endpoint_with_telemetry_path(
39 mut endpoint: Endpoint,
40 direct_submission_enabled: bool,
41) -> anyhow::Result<Endpoint> {
42 let mut uri_parts = endpoint.url.into_parts();
43 if uri_parts
44 .scheme
45 .as_ref()
46 .is_some_and(|scheme| scheme.as_str() != "file")
47 {
48 uri_parts.path_and_query = Some(PathAndQuery::from_static(
49 if endpoint.api_key.is_some() && direct_submission_enabled {
50 DIRECT_TELEMETRY_URL_PATH
51 } else {
52 AGENT_TELEMETRY_URL_PATH
53 },
54 ));
55 }
56
57 endpoint.url = Uri::from_parts(uri_parts)?;
58 Ok(endpoint)
59}
60
61#[derive(Debug)]
64pub struct Settings {
65 pub agent_host: Option<String>,
67 pub trace_agent_port: Option<u16>,
68 pub trace_agent_url: Option<String>,
69 pub trace_pipe_name: Option<String>,
70 pub direct_submission_enabled: bool,
71 pub api_key: Option<String>,
72 pub site: Option<String>,
73 pub telemetry_dd_url: Option<String>,
74 pub telemetry_heartbeat_interval: Duration,
75 pub telemetry_extended_heartbeat_interval: Duration,
76 pub shared_lib_debug: bool,
77
78 pub agent_uds_socket_found: bool,
80}
81
82impl Default for Settings {
83 fn default() -> Self {
84 Self {
85 agent_host: None,
86 trace_agent_port: None,
87 trace_agent_url: None,
88 trace_pipe_name: None,
89 direct_submission_enabled: false,
90 api_key: None,
91 site: None,
92 telemetry_dd_url: None,
93 telemetry_heartbeat_interval: Duration::from_secs(60),
94 telemetry_extended_heartbeat_interval: Duration::from_secs(60 * 60 * 24),
95 shared_lib_debug: false,
96
97 agent_uds_socket_found: false,
98 }
99 }
100}
101
102impl Settings {
103 const DD_TRACE_AGENT_URL: &'static str = "DD_TRACE_AGENT_URL";
105 const DD_AGENT_HOST: &'static str = "DD_AGENT_HOST";
106 const DD_TRACE_AGENT_PORT: &'static str = "DD_TRACE_AGENT_PORT";
107 const DD_TRACE_PIPE_NAME: &'static str = "DD_TRACE_PIPE_NAME";
109
110 const _DD_DIRECT_SUBMISSION_ENABLED: &'static str = "_DD_DIRECT_SUBMISSION_ENABLED";
112 const DD_API_KEY: &'static str = "DD_API_KEY";
113 const DD_SITE: &'static str = "DD_SITE";
114 const DD_APM_TELEMETRY_DD_URL: &'static str = "DD_APM_TELEMETRY_DD_URL";
115
116 const DD_TELEMETRY_HEARTBEAT_INTERVAL: &'static str = "DD_TELEMETRY_HEARTBEAT_INTERVAL";
118 const DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL: &'static str =
119 "DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL";
120 const _DD_SHARED_LIB_DEBUG: &'static str = "_DD_SHARED_LIB_DEBUG";
121
122 pub fn from_env() -> Self {
123 debug!(
124 config.source = "environment",
125 "Loading telemetry settings from environment variables"
126 );
127 let default = Self::default();
128 Self {
129 agent_host: parse_env::str_not_empty(Self::DD_AGENT_HOST),
130 trace_agent_port: parse_env::int(Self::DD_TRACE_AGENT_PORT),
131 trace_agent_url: parse_env::str_not_empty(Self::DD_TRACE_AGENT_URL)
132 .or(default.trace_agent_url),
133 trace_pipe_name: parse_env::str_not_empty(Self::DD_TRACE_PIPE_NAME)
134 .or(default.trace_pipe_name),
135 direct_submission_enabled: parse_env::bool(Self::_DD_DIRECT_SUBMISSION_ENABLED)
136 .unwrap_or(default.direct_submission_enabled),
137 api_key: parse_env::str_not_empty(Self::DD_API_KEY),
138 site: parse_env::str_not_empty(Self::DD_SITE),
139 telemetry_dd_url: parse_env::str_not_empty(Self::DD_APM_TELEMETRY_DD_URL),
140 telemetry_heartbeat_interval: parse_env::duration(
141 Self::DD_TELEMETRY_HEARTBEAT_INTERVAL,
142 )
143 .unwrap_or(Duration::from_secs(60)),
144 telemetry_extended_heartbeat_interval: parse_env::duration(
145 Self::DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL,
146 )
147 .unwrap_or(Duration::from_secs(60 * 60 * 24)),
148 shared_lib_debug: parse_env::bool(Self::_DD_SHARED_LIB_DEBUG).unwrap_or(false),
149
150 agent_uds_socket_found: (|| {
151 #[cfg(unix)]
152 return std::fs::metadata(TRACE_SOCKET_PATH).is_ok();
153 #[cfg(not(unix))]
154 return false;
155 })(),
156 }
157 }
158}
159
160impl Default for Config {
161 fn default() -> Self {
162 Self {
163 endpoint: None,
164 telemetry_debug_logging_enabled: false,
165 telemetry_heartbeat_interval: Duration::from_secs(60),
166 direct_submission_enabled: false,
167 restartable: false,
168 debug_enabled: false,
169 }
170 }
171}
172
173impl Config {
174 fn trace_agent_url_from_setting(settings: &Settings) -> String {
177 None.or_else(|| {
178 settings
179 .trace_agent_url
180 .as_deref()
181 .filter(|u| {
182 u.starts_with("unix://")
183 || u.starts_with("http://")
184 || u.starts_with("https://")
185 })
186 .map(ToString::to_string)
187 })
188 .or_else(|| {
189 #[cfg(windows)]
190 return settings
191 .trace_pipe_name
192 .as_ref()
193 .map(|pipe_name| format!("windows:{pipe_name}"));
194 #[cfg(not(windows))]
195 return None;
196 })
197 .or_else(|| match (&settings.agent_host, settings.trace_agent_port) {
198 (None, None) => None,
199 _ => Some(format!(
200 "http://{}:{}",
201 settings.agent_host.as_deref().unwrap_or(DEFAULT_AGENT_HOST),
202 settings.trace_agent_port.unwrap_or(DEFAULT_AGENT_PORT),
203 )),
204 })
205 .or_else(|| {
206 #[cfg(unix)]
207 return settings
208 .agent_uds_socket_found
209 .then(|| format!("unix://{TRACE_SOCKET_PATH}"));
210 #[cfg(not(unix))]
211 return None;
212 })
213 .unwrap_or_else(|| format!("http://{DEFAULT_AGENT_HOST}:{DEFAULT_AGENT_PORT}"))
214 }
215
216 fn api_key_from_settings(settings: &Settings) -> Option<Cow<'static, str>> {
217 if !settings.direct_submission_enabled {
218 return None;
219 }
220 settings.api_key.clone().map(Cow::Owned)
221 }
222
223 pub fn endpoint(&self) -> Option<&Endpoint> {
224 self.endpoint.as_ref()
225 }
226
227 pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> {
228 self.endpoint = Some(endpoint_with_telemetry_path(
229 endpoint,
230 self.direct_submission_enabled,
231 )?);
232 Ok(())
233 }
234
235 pub fn set_endpoint_test_token<T: Into<Cow<'static, str>>>(&mut self, test_token: Option<T>) {
236 if let Some(endpoint) = &mut self.endpoint {
237 endpoint.test_token = test_token.map(|t| t.into());
238 }
239 }
240
241 pub fn from_settings(settings: &Settings) -> Self {
242 let trace_agent_url = Self::trace_agent_url_from_setting(settings);
243 let api_key = Self::api_key_from_settings(settings);
244
245 let mut this = Self {
246 endpoint: None,
247 telemetry_debug_logging_enabled: settings.shared_lib_debug,
248 telemetry_heartbeat_interval: settings.telemetry_heartbeat_interval,
249 direct_submission_enabled: settings.direct_submission_enabled,
250 restartable: false,
251 debug_enabled: false,
252 };
253 if let Ok(url) = parse_uri(&trace_agent_url) {
254 let _res = this.set_endpoint(Endpoint {
255 url,
256 api_key,
257 ..Default::default()
258 });
259 }
260
261 this
262 }
263
264 pub fn from_env() -> Self {
266 let settings = Settings::from_env();
267 Self::from_settings(&settings)
268 }
269
270 pub fn set_host_from_url(&mut self, host_url: &str) -> anyhow::Result<()> {
281 let endpoint = self.endpoint.take().unwrap_or_default();
282
283 self.set_endpoint(Endpoint {
284 url: parse_uri(host_url)?,
285 ..endpoint
286 })
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use std::path::Path;
293
294 #[cfg(unix)]
295 use libdd_common::connector::uds;
296
297 use libdd_common::connector::named_pipe;
298
299 use super::{Config, Settings};
300
301 #[test]
302 fn test_agent_host_detection_trace_agent_url_should_take_precedence() {
303 let cases = [
304 (
305 "http://localhost:1234",
306 "http://localhost:1234/telemetry/proxy/api/v2/apmtelemetry",
307 ),
308 (
309 "unix://./here",
310 "unix://2e2f68657265/telemetry/proxy/api/v2/apmtelemetry",
311 ),
312 ];
313 for (trace_agent_url, expected) in cases {
314 let settings = Settings {
315 trace_agent_url: Some(trace_agent_url.to_owned()),
316 agent_host: Some("example.org".to_owned()),
317 trace_agent_port: Some(1),
318 trace_pipe_name: Some("C:\\foo".to_owned()),
319 agent_uds_socket_found: true,
320 ..Default::default()
321 };
322 let cfg = Config::from_settings(&settings);
323 assert_eq!(cfg.endpoint.unwrap().url.to_string(), expected);
324 }
325 }
326
327 #[test]
328 fn test_agent_host_detection_agent_host_and_port() {
329 let cases = [
330 (
331 Some("example.org"),
332 Some(1),
333 "http://example.org:1/telemetry/proxy/api/v2/apmtelemetry",
334 ),
335 (
336 Some("example.org"),
337 None,
338 "http://example.org:8126/telemetry/proxy/api/v2/apmtelemetry",
339 ),
340 (
341 None,
342 Some(1),
343 "http://localhost:1/telemetry/proxy/api/v2/apmtelemetry",
344 ),
345 ];
346 for (agent_host, trace_agent_port, expected) in cases {
347 let settings = Settings {
348 trace_agent_url: None,
349 agent_host: agent_host.map(ToString::to_string),
350 trace_agent_port,
351 trace_pipe_name: None,
352 agent_uds_socket_found: true,
353 ..Default::default()
354 };
355 let cfg = Config::from_settings(&settings);
356 assert_eq!(cfg.endpoint.unwrap().url.to_string(), expected);
357 }
358 }
359
360 #[test]
361 #[cfg(unix)]
362 fn test_agent_host_detection_socket_found() {
363 let settings = Settings {
364 trace_agent_url: None,
365 agent_host: None,
366 trace_agent_port: None,
367 trace_pipe_name: None,
368 agent_uds_socket_found: true,
369 ..Default::default()
370 };
371 let cfg = Config::from_settings(&settings);
372 assert_eq!(
373 cfg.endpoint.unwrap().url.to_string(),
374 "unix://2f7661722f72756e2f64617461646f672f61706d2e736f636b6574/telemetry/proxy/api/v2/apmtelemetry"
375 );
376 }
377
378 #[test]
379 fn test_agent_host_detection_fallback() {
380 let settings = Settings {
381 trace_agent_url: None,
382 agent_host: None,
383 trace_agent_port: None,
384 trace_pipe_name: None,
385 agent_uds_socket_found: false,
386 ..Default::default()
387 };
388
389 let cfg = Config::from_settings(&settings);
390 assert_eq!(
391 cfg.endpoint.unwrap().url.to_string(),
392 "http://localhost:8126/telemetry/proxy/api/v2/apmtelemetry"
393 );
394 }
395
396 #[test]
397 fn test_config_set_url() {
398 let mut cfg = Config::default();
399
400 cfg.set_host_from_url("http://example.com/any_path_will_be_ignored")
401 .unwrap();
402
403 assert_eq!(
404 "http://example.com/telemetry/proxy/api/v2/apmtelemetry",
405 cfg.clone().endpoint.unwrap().url
406 );
407 }
408
409 #[test]
410 fn test_config_set_url_file() {
411 let cases = [
412 ("file:///absolute/path", "/absolute/path"),
413 ("file://./relative/path", "./relative/path"),
414 ("file://relative/path", "relative/path"),
415 (
416 "file://c://temp//with space\\foo.json",
417 "c://temp//with space\\foo.json",
418 ),
419 ];
420
421 for (input, expected) in cases {
422 let mut cfg = Config::default();
423 cfg.set_host_from_url(input).unwrap();
424
425 assert_eq!(
426 "file",
427 cfg.clone()
428 .endpoint
429 .unwrap()
430 .url
431 .scheme()
432 .unwrap()
433 .to_string()
434 );
435 assert_eq!(
436 Path::new(expected),
437 libdd_common::decode_uri_path_in_authority(&cfg.endpoint.unwrap().url).unwrap(),
438 );
439 }
440 }
441
442 #[test]
443 #[cfg(unix)]
444 fn test_config_set_url_unix_socket() {
445 let mut cfg = Config::default();
446
447 cfg.set_host_from_url("unix:///compatiliby/path").unwrap();
448 assert_eq!(
449 "unix://2f636f6d706174696c6962792f70617468/telemetry/proxy/api/v2/apmtelemetry",
450 cfg.clone().endpoint.unwrap().url.to_string()
451 );
452 assert_eq!(
453 "/compatiliby/path",
454 uds::socket_path_from_uri(&cfg.clone().endpoint.unwrap().url)
455 .unwrap()
456 .to_string_lossy()
457 );
458 }
459
460 #[test]
461 fn test_config_set_url_windows_pipe() {
462 let mut cfg = Config::default();
463
464 cfg.set_host_from_url("windows:C:\\system32\\foo").unwrap();
465 assert_eq!(
466 "windows://433a5c73797374656d33325c666f6f/telemetry/proxy/api/v2/apmtelemetry",
467 cfg.clone().endpoint.unwrap().url.to_string()
468 );
469 assert_eq!(
470 "C:\\system32\\foo",
471 named_pipe::named_pipe_path_from_uri(&cfg.clone().endpoint.unwrap().url)
472 .unwrap()
473 .to_string_lossy()
474 );
475 }
476}