1use crate::{
2 log::{DatadogLog, FieldVisitor},
3 span::{DatadogSpan, SpanAttributeVisitor, SpanLink},
4};
5use jiff::Zoned;
6use reqwest::header::HeaderValue;
7use std::{
8 borrow::Cow,
9 collections::HashMap,
10 fmt::{Display, Formatter},
11 marker::PhantomData,
12 sync::{Arc, Mutex, mpsc},
13 thread::spawn,
14 time::{SystemTime, UNIX_EPOCH},
15};
16use tracing_core::{
17 Event, Subscriber,
18 span::{Attributes, Id, Record},
19};
20use tracing_subscriber::{
21 Layer,
22 layer::Context,
23 registry::{LookupSpan, Scope},
24};
25
26#[derive(Debug)]
44pub struct DatadogTraceLayer<S> {
45 buffer: Arc<Mutex<Vec<DatadogSpan>>>,
46 service: String,
47 default_tags: HashMap<Cow<'static, str>, String>,
48 logging_enabled: bool,
49 #[cfg(feature = "http")]
50 with_context: crate::http::WithContext,
51 shutdown: mpsc::Sender<()>,
52 _registry: PhantomData<S>,
53}
54
55impl<S> DatadogTraceLayer<S>
56where
57 S: Subscriber + for<'a> LookupSpan<'a>,
58{
59 pub fn builder() -> DatadogTraceLayerBuilder<S> {
61 DatadogTraceLayerBuilder {
62 service: None,
63 default_tags: HashMap::from_iter([("span.kind".into(), "internal".to_string())]),
64 agent_address: None,
65 container_id: None,
66 logging_enabled: false,
67 phantom_data: Default::default(),
68 }
69 }
70
71 #[cfg(feature = "http")]
72 fn get_context(
73 dispatch: &tracing_core::Dispatch,
74 id: &Id,
75 f: &mut dyn FnMut(&mut DatadogSpan),
76 ) {
77 let subscriber = dispatch
78 .downcast_ref::<S>()
79 .expect("Subscriber did not downcast to expected type, this is a bug");
80 let span = subscriber.span(id).expect("Span not found, this is a bug");
81
82 let mut extensions = span.extensions_mut();
83 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
84 f(dd_span);
85 }
86 }
87}
88
89impl<S> Drop for DatadogTraceLayer<S> {
90 fn drop(&mut self) {
91 let _ = self.shutdown.send(());
92 }
93}
94
95impl<S> Layer<S> for DatadogTraceLayer<S>
96where
97 S: Subscriber + for<'a> LookupSpan<'a>,
98{
99 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
100 let span = ctx.span(id).expect("Span not found, this is a bug");
101 let mut extensions = span.extensions_mut();
102
103 let trace_id = span
104 .parent()
105 .map(|parent| {
106 parent
107 .extensions()
108 .get::<DatadogSpan>()
109 .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
110 .trace_id
111 })
112 .unwrap_or(rand::random_range(1..=u64::MAX));
113
114 debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
115
116 let mut dd_span = DatadogSpan {
117 name: span.name().to_string(),
118 service: self.service.clone(),
119 r#type: "custom".into(),
120 span_id: span.id().into_u64(),
121 start: epoch_ns(),
122 parent_id: span
123 .parent()
124 .map(|parent| parent.id().into_u64())
125 .unwrap_or_default(),
126 trace_id,
127 meta: self.default_tags.clone(),
128 metrics: {
129 let mut m = HashMap::new();
130 if span.parent().is_none() {
131 m.insert("_dd.top_level", 1.0);
133 m.insert("_dd.agent_psr", 1.0);
134 m.insert("_dd.rule_psr", 1.0);
135 m.insert("_dd.limit_psr", 1.0);
136 m.insert("_sample_rate", 1.0);
137 m.insert("_dd.tracer_kr", 1.0);
138 }
139 m.insert("_sampling_priority_v1", 2.0);
140 m.insert("process_id", std::process::id() as f64);
141 m
142 },
143 ..Default::default()
144 };
145
146 attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
147
148 extensions.insert(dd_span);
149 }
150
151 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
152 let span = ctx.span(id).expect("Span not found, this is a bug");
153 let mut extensions = span.extensions_mut();
154
155 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
156 values.record(&mut SpanAttributeVisitor::new(dd_span));
157 }
158 }
159
160 fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
161 let span = ctx.span(id).expect("Span not found, this is a bug");
162 let mut extensions = span.extensions_mut();
163
164 let Some(other_span) = ctx.span(follows) else {
165 return;
167 };
168
169 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>()
170 && let Some(other_dd_span) = other_span.extensions().get::<DatadogSpan>()
171 {
172 dd_span.span_links.push(SpanLink {
173 trace_id: other_dd_span.trace_id,
174 span_id: other_dd_span.span_id,
175 })
176 }
177 }
178
179 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
180 if !self.logging_enabled {
181 return;
182 }
183
184 let mut fields = {
185 let mut visitor = FieldVisitor::default();
186 event.record(&mut visitor);
187 visitor.finish()
188 };
189
190 fields.extend(
191 ctx.event_scope(event)
192 .into_iter()
193 .flat_map(Scope::from_root)
194 .flat_map(|span| match span.extensions().get::<DatadogSpan>() {
195 Some(dd_span) => dd_span.meta.clone(),
196 None => panic!("DatadogSpan extension not found, this is a bug"),
197 }),
198 );
199
200 let message = fields.remove("message").unwrap_or_default();
201
202 let (trace_id, span_id) = ctx
203 .lookup_current()
204 .and_then(|span| {
205 span.extensions()
206 .get::<DatadogSpan>()
207 .map(|dd_span| (Some(dd_span.trace_id), Some(dd_span.span_id)))
208 })
209 .unwrap_or_default();
210
211 let log = DatadogLog {
212 timestamp: Zoned::now().timestamp(),
213 level: event.metadata().level().to_owned(),
214 message,
215 trace_id,
216 span_id,
217 fields,
218 };
219
220 let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
221
222 println!("{serialized}");
223 }
224
225 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
226 let span = ctx.span(id).expect("Span not found, this is a bug");
227 let mut extensions = span.extensions_mut();
228
229 let now = epoch_ns();
230
231 match extensions.get_mut::<DatadogSpan>() {
232 Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
233 _ => {}
234 }
235 }
236
237 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
238 let span = ctx.span(id).expect("Span not found, this is a bug");
239 let mut extensions = span.extensions_mut();
240
241 let now = epoch_ns();
242
243 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
244 dd_span.duration = now - dd_span.start
245 }
246 }
247
248 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
249 let span = ctx.span(&id).expect("Span not found, this is a bug");
250 let mut extensions = span.extensions_mut();
251
252 if let Some(mut dd_span) = extensions.remove::<DatadogSpan>() {
253 if let Some("server" | "client" | "consumer" | "producer") =
255 dd_span.meta.get("span.kind").map(String::as_str)
256 {
257 dd_span.metrics.insert("_dd.measured", 1.0);
258 dd_span.metrics.insert("_dd1.sr.eausr", 1.0);
259 }
260
261 self.buffer.lock().unwrap().push(dd_span);
262 }
263 }
264
265 #[cfg(feature = "http")]
268 unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
269 match id {
270 id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
271 id if id == std::any::TypeId::of::<crate::http::WithContext>() => {
272 Some(&self.with_context as *const _ as *const ())
273 }
274 _ => None,
275 }
276 }
277}
278
279pub struct DatadogTraceLayerBuilder<S> {
281 service: Option<String>,
282 default_tags: HashMap<Cow<'static, str>, String>,
283 agent_address: Option<String>,
284 container_id: Option<String>,
285 logging_enabled: bool,
286 phantom_data: PhantomData<S>,
287}
288
289#[derive(Debug)]
291pub struct BuilderError(&'static str);
292
293impl Display for BuilderError {
294 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
295 f.write_str(self.0)
296 }
297}
298
299impl std::error::Error for BuilderError {}
300
301impl<S> DatadogTraceLayerBuilder<S>
302where
303 S: Subscriber + for<'a> LookupSpan<'a>,
304{
305 pub fn service(mut self, service: impl Into<String>) -> Self {
307 self.service = Some(service.into());
308 self
309 }
310
311 pub fn env(mut self, env: impl Into<String>) -> Self {
313 self.default_tags.insert("env".into(), env.into());
314 self
315 }
316
317 pub fn version(mut self, version: impl Into<String>) -> Self {
319 self.default_tags.insert("version".into(), version.into());
320 self
321 }
322
323 pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
325 self.agent_address = Some(agent_address.into());
326 self
327 }
328
329 pub fn default_tag(
335 mut self,
336 key: impl Into<Cow<'static, str>>,
337 value: impl Into<String>,
338 ) -> Self {
339 let _ = self.default_tags.insert(key.into(), value.into());
340 self
341 }
342
343 pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
345 self.container_id = Some(container_id.into());
346 self
347 }
348
349 pub fn enable_logs(mut self, enable_logs: bool) -> Self {
352 self.logging_enabled = enable_logs;
353 self
354 }
355
356 pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
358 let Some(service) = self.service else {
359 return Err(BuilderError("service is required"));
360 };
361 if !self.default_tags.contains_key("env") {
362 return Err(BuilderError("env is required"));
363 };
364 if !self.default_tags.contains_key("version") {
365 return Err(BuilderError("version is required"));
366 };
367 let Some(agent_address) = self.agent_address else {
368 return Err(BuilderError("agent_address is required"));
369 };
370 let container_id = match self.container_id {
371 Some(s) => Some(
372 s.parse::<HeaderValue>()
373 .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
374 ),
375 _ => None,
376 };
377
378 let buffer = Arc::new(Mutex::new(Vec::new()));
379 let (shutdown, shutdown_rx) = mpsc::channel();
380
381 spawn(crate::export::exporter(
382 agent_address,
383 buffer.clone(),
384 container_id,
385 shutdown_rx,
386 ));
387
388 Ok(DatadogTraceLayer {
389 buffer,
390 service,
391 default_tags: self.default_tags,
392 logging_enabled: self.logging_enabled,
393 #[cfg(feature = "http")]
394 with_context: crate::http::WithContext(DatadogTraceLayer::<S>::get_context),
395 shutdown,
396 _registry: PhantomData,
397 })
398 }
399}
400
401fn epoch_ns() -> i64 {
403 SystemTime::now()
404 .duration_since(UNIX_EPOCH)
405 .expect("SystemTime is before UNIX epoch")
406 .as_nanos() as i64
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
414 fn builder_builds_successfully() {
415 assert!(
416 DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
417 .service("test-service")
418 .env("test")
419 .version("test-version")
420 .agent_address("localhost:8126")
421 .build()
422 .is_ok()
423 );
424 }
425
426 #[test]
427 fn service_is_required() {
428 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
429 .env("test")
430 .version("test-version")
431 .agent_address("localhost:8126")
432 .build();
433 assert!(result.unwrap_err().to_string().contains("service"));
434 }
435
436 #[test]
437 fn env_is_required() {
438 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
439 .service("test-service")
440 .version("test-version")
441 .agent_address("localhost:8126")
442 .build();
443 assert!(result.unwrap_err().to_string().contains("env"));
444 }
445
446 #[test]
447 fn version_is_required() {
448 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
449 .service("test-service")
450 .env("test")
451 .agent_address("localhost:8126")
452 .build();
453 assert!(result.unwrap_err().to_string().contains("version"));
454 }
455
456 #[test]
457 fn agent_address_is_required() {
458 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
459 .service("test-service")
460 .env("test")
461 .version("test-version")
462 .build();
463 assert!(result.unwrap_err().to_string().contains("agent_address"));
464 }
465
466 #[test]
467 fn default_default_tags_include_env_and_version() {
468 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
469 .service("test-service")
470 .env("test")
471 .version("test-version")
472 .agent_address("localhost:8126")
473 .build()
474 .unwrap();
475 let default_tags = &layer.default_tags;
476 assert_eq!(default_tags["env"], "test");
477 assert_eq!(default_tags["version"], "test-version");
478 }
479
480 #[test]
481 fn default_tags_can_be_added() {
482 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
483 .service("test-service")
484 .env("test")
485 .version("test-version")
486 .agent_address("localhost:8126")
487 .default_tag("static", "bar")
488 .default_tag(String::from("dynamic"), "qux")
489 .build()
490 .unwrap();
491 let default_tags = &layer.default_tags;
492 assert_eq!(default_tags["static"], "bar");
493 assert_eq!(default_tags["dynamic"], "qux");
494 }
495}