1use crate::{
2 log::{FieldVisitor, Log},
3 span::{Span, SpanAttributeVisitor, SpanLink},
4};
5#[cfg(feature = "ahash")]
6use ahash::AHashMap as HashMap;
7use jiff::Zoned;
8use reqwest::header::HeaderValue;
9#[cfg(not(feature = "ahash"))]
10use std::collections::HashMap;
11use std::{
12 borrow::Cow,
13 fmt::{Display, Formatter},
14 marker::PhantomData,
15 sync::{Arc, Mutex, mpsc},
16 thread::spawn,
17 time::{SystemTime, UNIX_EPOCH},
18};
19use tracing_core::{
20 Event, Subscriber,
21 span::{Attributes, Id, Record},
22};
23use tracing_subscriber::{
24 Layer,
25 layer::Context,
26 registry::{LookupSpan, Scope},
27};
28
29#[derive(Debug)]
47pub struct DatadogTraceLayer<S> {
48 buffer: Arc<Mutex<Vec<Span>>>,
49 service: String,
50 default_tags: HashMap<Cow<'static, str>, String>,
51 logging_enabled: bool,
52 with_context: crate::context::WithContext,
53 shutdown: mpsc::Sender<()>,
54 _registry: PhantomData<S>,
55}
56
57impl<S> DatadogTraceLayer<S>
58where
59 S: Subscriber + for<'a> LookupSpan<'a>,
60{
61 pub fn builder() -> DatadogTraceLayerBuilder<S> {
63 DatadogTraceLayerBuilder {
64 service: None,
65 default_tags: HashMap::from_iter([("span.kind".into(), "internal".to_string())]),
66 agent_address: None,
67 container_id: None,
68 logging_enabled: false,
69 phantom_data: Default::default(),
70 }
71 }
72
73 fn get_context(dispatch: &tracing_core::Dispatch, id: &Id, f: &mut dyn FnMut(&mut Span)) {
74 let subscriber = dispatch
75 .downcast_ref::<S>()
76 .expect("Subscriber did not downcast to expected type, this is a bug");
77 let span = subscriber.span(id).expect("Span not found, this is a bug");
78
79 let mut extensions = span.extensions_mut();
80 if let Some(dd_span) = extensions.get_mut::<Span>() {
81 f(dd_span);
82 }
83 }
84}
85
86impl<S> Drop for DatadogTraceLayer<S> {
87 fn drop(&mut self) {
88 let _ = self.shutdown.send(());
89 }
90}
91
92impl<S> Layer<S> for DatadogTraceLayer<S>
93where
94 S: Subscriber + for<'a> LookupSpan<'a>,
95{
96 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
97 let span = ctx.span(id).expect("Span not found, this is a bug");
98 let mut extensions = span.extensions_mut();
99
100 let trace_id = span
101 .parent()
102 .map(|parent| {
103 parent
104 .extensions()
105 .get::<Span>()
106 .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
107 .trace_id
108 })
109 .unwrap_or(rand::random_range(1..=u128::MAX));
110
111 debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
112
113 let mut dd_span = Span {
114 name: span.name().to_string(),
115 service: self.service.clone(),
116 r#type: "custom".into(),
117 span_id: span.id().into_u64(),
118 start: epoch_ns(),
119 parent_id: span
120 .parent()
121 .map(|parent| parent.id().into_u64())
122 .unwrap_or_default(),
123 trace_id,
124 meta: self.default_tags.clone(),
125 metrics: {
126 let mut m = HashMap::new();
127 if span.parent().is_none() {
128 m.insert("_dd.top_level", 1.0);
130 m.insert("_dd.agent_psr", 1.0);
131 m.insert("_dd.rule_psr", 1.0);
132 m.insert("_dd.limit_psr", 1.0);
133 m.insert("_sample_rate", 1.0);
134 m.insert("_dd.tracer_kr", 1.0);
135 }
136 m.insert("_sampling_priority_v1", 2.0);
137 m.insert("process_id", std::process::id() as f64);
138 m
139 },
140 ..Default::default()
141 };
142
143 attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
144
145 extensions.insert(dd_span);
146 }
147
148 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
149 let span = ctx.span(id).expect("Span not found, this is a bug");
150 let mut extensions = span.extensions_mut();
151
152 if let Some(dd_span) = extensions.get_mut::<Span>() {
153 values.record(&mut SpanAttributeVisitor::new(dd_span));
154 }
155 }
156
157 fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
158 let span = ctx.span(id).expect("Span not found, this is a bug");
159 let mut extensions = span.extensions_mut();
160
161 let Some(other_span) = ctx.span(follows) else {
162 return;
164 };
165
166 if let Some(dd_span) = extensions.get_mut::<Span>()
167 && let Some(other_dd_span) = other_span.extensions().get::<Span>()
168 {
169 dd_span.span_links.push(SpanLink {
170 trace_id: other_dd_span.trace_id,
171 span_id: other_dd_span.span_id,
172 })
173 }
174 }
175
176 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
177 if !self.logging_enabled {
178 return;
179 }
180
181 let mut fields = {
182 let mut visitor = FieldVisitor::default();
183 event.record(&mut visitor);
184 visitor.finish()
185 };
186
187 fields.extend(
188 ctx.event_scope(event)
189 .into_iter()
190 .flat_map(Scope::from_root)
191 .flat_map(|span| match span.extensions().get::<Span>() {
192 Some(dd_span) => dd_span.meta.clone(),
193 None => panic!("Datadog Span extension not found, this is a bug"),
194 }),
195 );
196
197 let message = fields.remove("message").unwrap_or_default();
198
199 let trace_context = ctx.lookup_current().and_then(|span| {
200 span.extensions()
201 .get::<Span>()
202 .map(|dd_span| (dd_span.trace_id, dd_span.span_id))
203 });
204
205 let log = Log {
206 timestamp: Zoned::now().timestamp(),
207 level: event.metadata().level().to_owned(),
208 message,
209 trace_context,
210 fields,
211 };
212
213 let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
214
215 println!("{serialized}");
216 }
217
218 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
219 let span = ctx.span(id).expect("Span not found, this is a bug");
220 let mut extensions = span.extensions_mut();
221
222 let now = epoch_ns();
223
224 match extensions.get_mut::<Span>() {
225 Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
226 _ => {}
227 }
228 }
229
230 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
231 let span = ctx.span(id).expect("Span not found, this is a bug");
232 let mut extensions = span.extensions_mut();
233
234 let now = epoch_ns();
235
236 if let Some(dd_span) = extensions.get_mut::<Span>() {
237 dd_span.duration = now - dd_span.start
238 }
239 }
240
241 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
242 let span = ctx.span(&id).expect("Span not found, this is a bug");
243 let mut extensions = span.extensions_mut();
244
245 if let Some(mut dd_span) = extensions.remove::<Span>() {
246 if let Some("server" | "client" | "consumer" | "producer") =
248 dd_span.meta.get("span.kind").map(String::as_str)
249 {
250 dd_span.metrics.insert("_dd.measured", 1.0);
251 dd_span.metrics.insert("_dd1.sr.eausr", 1.0);
252 }
253
254 self.buffer.lock().unwrap().push(dd_span);
255 }
256 }
257
258 unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
261 match id {
262 id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
263 id if id == std::any::TypeId::of::<crate::context::WithContext>() => {
264 Some(&self.with_context as *const _ as *const ())
265 }
266 _ => None,
267 }
268 }
269}
270
271pub struct DatadogTraceLayerBuilder<S> {
273 service: Option<String>,
274 default_tags: HashMap<Cow<'static, str>, String>,
275 agent_address: Option<String>,
276 container_id: Option<String>,
277 logging_enabled: bool,
278 phantom_data: PhantomData<S>,
279}
280
281#[derive(Debug)]
283pub struct BuilderError(&'static str);
284
285impl Display for BuilderError {
286 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287 f.write_str(self.0)
288 }
289}
290
291impl std::error::Error for BuilderError {}
292
293impl<S> DatadogTraceLayerBuilder<S>
294where
295 S: Subscriber + for<'a> LookupSpan<'a>,
296{
297 pub fn service(mut self, service: impl Into<String>) -> Self {
299 self.service = Some(service.into());
300 self
301 }
302
303 pub fn env(mut self, env: impl Into<String>) -> Self {
305 self.default_tags.insert("env".into(), env.into());
306 self
307 }
308
309 pub fn version(mut self, version: impl Into<String>) -> Self {
311 self.default_tags.insert("version".into(), version.into());
312 self
313 }
314
315 pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
317 self.agent_address = Some(agent_address.into());
318 self
319 }
320
321 pub fn default_tag(
327 mut self,
328 key: impl Into<Cow<'static, str>>,
329 value: impl Into<String>,
330 ) -> Self {
331 let _ = self.default_tags.insert(key.into(), value.into());
332 self
333 }
334
335 pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
337 self.container_id = Some(container_id.into());
338 self
339 }
340
341 pub fn enable_logs(mut self, enable_logs: bool) -> Self {
344 self.logging_enabled = enable_logs;
345 self
346 }
347
348 pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
350 let Some(service) = self.service else {
351 return Err(BuilderError("service is required"));
352 };
353 if !self.default_tags.contains_key("env") {
354 return Err(BuilderError("env is required"));
355 };
356 if !self.default_tags.contains_key("version") {
357 return Err(BuilderError("version is required"));
358 };
359 let Some(agent_address) = self.agent_address else {
360 return Err(BuilderError("agent_address is required"));
361 };
362 let container_id = match self.container_id {
363 Some(s) => Some(
364 s.parse::<HeaderValue>()
365 .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
366 ),
367 _ => None,
368 };
369
370 let buffer = Arc::new(Mutex::new(Vec::new()));
371 let (shutdown, shutdown_rx) = mpsc::channel();
372
373 spawn(crate::export::exporter(
374 agent_address,
375 buffer.clone(),
376 container_id,
377 shutdown_rx,
378 ));
379
380 Ok(DatadogTraceLayer {
381 buffer,
382 service,
383 default_tags: self.default_tags,
384 logging_enabled: self.logging_enabled,
385 with_context: crate::context::WithContext(DatadogTraceLayer::<S>::get_context),
386 shutdown,
387 _registry: PhantomData,
388 })
389 }
390}
391
392fn epoch_ns() -> i64 {
394 SystemTime::now()
395 .duration_since(UNIX_EPOCH)
396 .expect("SystemTime is before UNIX epoch")
397 .as_nanos() as i64
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn builder_builds_successfully() {
406 assert!(
407 DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
408 .service("test-service")
409 .env("test")
410 .version("test-version")
411 .agent_address("localhost:8126")
412 .build()
413 .is_ok()
414 );
415 }
416
417 #[test]
418 fn service_is_required() {
419 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
420 .env("test")
421 .version("test-version")
422 .agent_address("localhost:8126")
423 .build();
424 assert!(result.unwrap_err().to_string().contains("service"));
425 }
426
427 #[test]
428 fn env_is_required() {
429 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
430 .service("test-service")
431 .version("test-version")
432 .agent_address("localhost:8126")
433 .build();
434 assert!(result.unwrap_err().to_string().contains("env"));
435 }
436
437 #[test]
438 fn version_is_required() {
439 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
440 .service("test-service")
441 .env("test")
442 .agent_address("localhost:8126")
443 .build();
444 assert!(result.unwrap_err().to_string().contains("version"));
445 }
446
447 #[test]
448 fn agent_address_is_required() {
449 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
450 .service("test-service")
451 .env("test")
452 .version("test-version")
453 .build();
454 assert!(result.unwrap_err().to_string().contains("agent_address"));
455 }
456
457 #[test]
458 fn default_default_tags_include_env_and_version() {
459 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
460 .service("test-service")
461 .env("test")
462 .version("test-version")
463 .agent_address("localhost:8126")
464 .build()
465 .unwrap();
466 let default_tags = &layer.default_tags;
467 assert_eq!(default_tags["env"], "test");
468 assert_eq!(default_tags["version"], "test-version");
469 }
470
471 #[test]
472 fn default_tags_can_be_added() {
473 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
474 .service("test-service")
475 .env("test")
476 .version("test-version")
477 .agent_address("localhost:8126")
478 .default_tag("static", "bar")
479 .default_tag(String::from("dynamic"), "qux")
480 .build()
481 .unwrap();
482 let default_tags = &layer.default_tags;
483 assert_eq!(default_tags["static"], "bar");
484 assert_eq!(default_tags["dynamic"], "qux");
485 }
486}