1use crate::export::ApiVersion;
2use crate::{
3 log::{FieldVisitor, Log},
4 span::{Span, SpanAttributeVisitor, SpanLink},
5};
6#[cfg(feature = "ahash")]
7use ahash::AHashMap as HashMap;
8use jiff::Zoned;
9use reqwest::header::HeaderValue;
10#[cfg(not(feature = "ahash"))]
11use std::collections::HashMap;
12use std::{
13 borrow::Cow,
14 fmt::{Display, Formatter},
15 marker::PhantomData,
16 sync::{Arc, Mutex, mpsc},
17 thread::spawn,
18 time::{SystemTime, UNIX_EPOCH},
19};
20use tracing_core::{
21 Event, Subscriber,
22 span::{Attributes, Id, Record},
23};
24use tracing_subscriber::{
25 Layer,
26 layer::Context,
27 registry::{LookupSpan, Scope},
28};
29
30#[derive(Debug)]
48pub struct DatadogTraceLayer<S> {
49 buffer: Arc<Mutex<Vec<Span>>>,
50 service: String,
51 default_tags: HashMap<Cow<'static, str>, String>,
52 logging_enabled: bool,
53 with_context: crate::context::WithContext,
54 shutdown: mpsc::Sender<()>,
55 _registry: PhantomData<S>,
56}
57
58impl<S> DatadogTraceLayer<S>
59where
60 S: Subscriber + for<'a> LookupSpan<'a>,
61{
62 pub fn builder() -> DatadogTraceLayerBuilder<S> {
64 DatadogTraceLayerBuilder {
65 service: None,
66 default_tags: HashMap::from_iter([("span.kind".into(), "internal".to_string())]),
67 agent_address: None,
68 api_version: ApiVersion::V04,
69 container_id: None,
70 logging_enabled: false,
71 phantom_data: Default::default(),
72 }
73 }
74
75 fn get_context(dispatch: &tracing_core::Dispatch, id: &Id, f: &mut dyn FnMut(&mut Span)) {
76 let subscriber = dispatch
77 .downcast_ref::<S>()
78 .expect("Subscriber did not downcast to expected type, this is a bug");
79 let span = subscriber.span(id).expect("Span not found, this is a bug");
80
81 let mut extensions = span.extensions_mut();
82 if let Some(dd_span) = extensions.get_mut::<Span>() {
83 f(dd_span);
84 }
85 }
86}
87
88impl<S> Drop for DatadogTraceLayer<S> {
89 fn drop(&mut self) {
90 let _ = self.shutdown.send(());
91 }
92}
93
94impl<S> Layer<S> for DatadogTraceLayer<S>
95where
96 S: Subscriber + for<'a> LookupSpan<'a>,
97{
98 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
99 let span = ctx.span(id).expect("Span not found, this is a bug");
100 let mut extensions = span.extensions_mut();
101
102 let trace_id = span
103 .parent()
104 .map(|parent| {
105 parent
106 .extensions()
107 .get::<Span>()
108 .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
109 .trace_id
110 })
111 .unwrap_or(rand::random_range(1..=u128::MAX));
112
113 debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
114
115 let mut dd_span = Span {
116 name: span.name().to_string(),
117 service: self.service.clone(),
118 r#type: "custom".into(),
119 span_id: span.id().into_u64(),
120 start: epoch_ns(),
121 parent_id: span
122 .parent()
123 .map(|parent| parent.id().into_u64())
124 .unwrap_or_default(),
125 trace_id,
126 meta: self.default_tags.clone(),
127 metrics: {
128 let mut m = HashMap::new();
129 if span.parent().is_none() {
130 m.insert("_dd.top_level".into(), 1.0);
132 m.insert("_dd.agent_psr".into(), 1.0);
133 m.insert("_dd.rule_psr".into(), 1.0);
134 m.insert("_dd.limit_psr".into(), 1.0);
135 m.insert("_sample_rate".into(), 1.0);
136 m.insert("_dd.tracer_kr".into(), 1.0);
137 }
138 m.insert("_sampling_priority_v1".into(), 2.0);
139 m.insert("process_id".into(), std::process::id() as f64);
140 m
141 },
142 ..Default::default()
143 };
144
145 attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
146
147 extensions.insert(dd_span);
148 }
149
150 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
151 let span = ctx.span(id).expect("Span not found, this is a bug");
152 let mut extensions = span.extensions_mut();
153
154 if let Some(dd_span) = extensions.get_mut::<Span>() {
155 values.record(&mut SpanAttributeVisitor::new(dd_span));
156 }
157 }
158
159 fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
160 let span = ctx.span(id).expect("Span not found, this is a bug");
161 let mut extensions = span.extensions_mut();
162
163 let Some(other_span) = ctx.span(follows) else {
164 return;
166 };
167
168 if let Some(dd_span) = extensions.get_mut::<Span>()
169 && let Some(other_dd_span) = other_span.extensions().get::<Span>()
170 {
171 dd_span.span_links.push(SpanLink {
172 trace_id: other_dd_span.trace_id,
173 span_id: other_dd_span.span_id,
174 })
175 }
176 }
177
178 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
179 if !self.logging_enabled {
180 return;
181 }
182
183 let mut fields = {
184 let mut visitor = FieldVisitor::default();
185 event.record(&mut visitor);
186 visitor.finish()
187 };
188
189 fields.extend(
190 ctx.event_scope(event)
191 .into_iter()
192 .flat_map(Scope::from_root)
193 .flat_map(|span| match span.extensions().get::<Span>() {
194 Some(dd_span) => dd_span.meta.clone(),
195 None => panic!("Datadog Span extension not found, this is a bug"),
196 }),
197 );
198
199 let message = fields.remove("message").unwrap_or_default();
200
201 let trace_context = ctx.lookup_current().and_then(|span| {
202 span.extensions()
203 .get::<Span>()
204 .map(|dd_span| (dd_span.trace_id, dd_span.span_id))
205 });
206
207 let log = Log {
208 timestamp: Zoned::now().timestamp(),
209 level: event.metadata().level().to_owned(),
210 message,
211 trace_context,
212 fields,
213 };
214
215 let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
216
217 println!("{serialized}");
218 }
219
220 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
221 let span = ctx.span(id).expect("Span not found, this is a bug");
222 let mut extensions = span.extensions_mut();
223
224 let now = epoch_ns();
225
226 match extensions.get_mut::<Span>() {
227 Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
228 _ => {}
229 }
230 }
231
232 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
233 let span = ctx.span(id).expect("Span not found, this is a bug");
234 let mut extensions = span.extensions_mut();
235
236 let now = epoch_ns();
237
238 if let Some(dd_span) = extensions.get_mut::<Span>() {
239 dd_span.duration = now - dd_span.start
240 }
241 }
242
243 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
244 let span = ctx.span(&id).expect("Span not found, this is a bug");
245 let mut extensions = span.extensions_mut();
246
247 if let Some(mut dd_span) = extensions.remove::<Span>() {
248 if let Some("server" | "client" | "consumer" | "producer") =
250 dd_span.meta.get("span.kind").map(String::as_str)
251 {
252 dd_span.metrics.insert("_dd.measured".into(), 1.0);
253 dd_span.metrics.insert("_dd1.sr.eausr".into(), 1.0);
254 }
255
256 self.buffer.lock().unwrap().push(dd_span);
257 }
258 }
259
260 unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
263 match id {
264 id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
265 id if id == std::any::TypeId::of::<crate::context::WithContext>() => {
266 Some(&self.with_context as *const _ as *const ())
267 }
268 _ => None,
269 }
270 }
271}
272
273pub struct DatadogTraceLayerBuilder<S> {
275 service: Option<String>,
276 default_tags: HashMap<Cow<'static, str>, String>,
277 agent_address: Option<String>,
278 api_version: ApiVersion,
279 container_id: Option<String>,
280 logging_enabled: bool,
281 phantom_data: PhantomData<S>,
282}
283
284#[derive(Debug)]
286pub struct BuilderError(&'static str);
287
288impl Display for BuilderError {
289 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
290 f.write_str(self.0)
291 }
292}
293
294impl std::error::Error for BuilderError {}
295
296impl<S> DatadogTraceLayerBuilder<S>
297where
298 S: Subscriber + for<'a> LookupSpan<'a>,
299{
300 pub fn service(mut self, service: impl Into<String>) -> Self {
302 self.service = Some(service.into());
303 self
304 }
305
306 pub fn env(mut self, env: impl Into<String>) -> Self {
308 self.default_tags.insert("env".into(), env.into());
309 self
310 }
311
312 pub fn version(mut self, version: impl Into<String>) -> Self {
314 self.default_tags.insert("version".into(), version.into());
315 self
316 }
317
318 pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
320 self.agent_address = Some(agent_address.into());
321 self
322 }
323
324 pub fn api_version(mut self, api_version: ApiVersion) -> Self {
326 self.api_version = api_version;
327 self
328 }
329
330 pub fn default_tag(
336 mut self,
337 key: impl Into<Cow<'static, str>>,
338 value: impl Into<String>,
339 ) -> Self {
340 let _ = self.default_tags.insert(key.into(), value.into());
341 self
342 }
343
344 pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
346 self.container_id = Some(container_id.into());
347 self
348 }
349
350 pub fn enable_logs(mut self, enable_logs: bool) -> Self {
353 self.logging_enabled = enable_logs;
354 self
355 }
356
357 pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
359 let Some(service) = self.service else {
360 return Err(BuilderError("service is required"));
361 };
362 if !self.default_tags.contains_key("env") {
363 return Err(BuilderError("env is required"));
364 };
365 if !self.default_tags.contains_key("version") {
366 return Err(BuilderError("version is required"));
367 };
368 let Some(agent_address) = self.agent_address else {
369 return Err(BuilderError("agent_address is required"));
370 };
371 let container_id = match self.container_id {
372 Some(s) => Some(
373 s.parse::<HeaderValue>()
374 .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
375 ),
376 _ => None,
377 };
378
379 let buffer = Arc::new(Mutex::new(Vec::new()));
380 let (shutdown, shutdown_rx) = mpsc::channel();
381
382 spawn(crate::export::exporter(
383 agent_address,
384 self.api_version,
385 buffer.clone(),
386 container_id,
387 shutdown_rx,
388 ));
389
390 Ok(DatadogTraceLayer {
391 buffer,
392 service,
393 default_tags: self.default_tags,
394 logging_enabled: self.logging_enabled,
395 with_context: crate::context::WithContext(DatadogTraceLayer::<S>::get_context),
396 shutdown,
397 _registry: PhantomData,
398 })
399 }
400}
401
402fn epoch_ns() -> i64 {
404 SystemTime::now()
405 .duration_since(UNIX_EPOCH)
406 .expect("SystemTime is before UNIX epoch")
407 .as_nanos() as i64
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 #[test]
415 fn builder_builds_successfully() {
416 assert!(
417 DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
418 .service("test-service")
419 .env("test")
420 .version("test-version")
421 .agent_address("localhost:8126")
422 .build()
423 .is_ok()
424 );
425 }
426
427 #[test]
428 fn service_is_required() {
429 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
430 .env("test")
431 .version("test-version")
432 .agent_address("localhost:8126")
433 .build();
434 assert!(result.unwrap_err().to_string().contains("service"));
435 }
436
437 #[test]
438 fn env_is_required() {
439 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
440 .service("test-service")
441 .version("test-version")
442 .agent_address("localhost:8126")
443 .build();
444 assert!(result.unwrap_err().to_string().contains("env"));
445 }
446
447 #[test]
448 fn version_is_required() {
449 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
450 .service("test-service")
451 .env("test")
452 .agent_address("localhost:8126")
453 .build();
454 assert!(result.unwrap_err().to_string().contains("version"));
455 }
456
457 #[test]
458 fn agent_address_is_required() {
459 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
460 .service("test-service")
461 .env("test")
462 .version("test-version")
463 .build();
464 assert!(result.unwrap_err().to_string().contains("agent_address"));
465 }
466
467 #[test]
468 fn default_default_tags_include_env_and_version() {
469 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
470 .service("test-service")
471 .env("test")
472 .version("test-version")
473 .agent_address("localhost:8126")
474 .build()
475 .unwrap();
476 let default_tags = &layer.default_tags;
477 assert_eq!(default_tags["env"], "test");
478 assert_eq!(default_tags["version"], "test-version");
479 }
480
481 #[test]
482 fn default_tags_can_be_added() {
483 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
484 .service("test-service")
485 .env("test")
486 .version("test-version")
487 .agent_address("localhost:8126")
488 .default_tag("static", "bar")
489 .default_tag(String::from("dynamic"), "qux")
490 .build()
491 .unwrap();
492 let default_tags = &layer.default_tags;
493 assert_eq!(default_tags["static"], "bar");
494 assert_eq!(default_tags["dynamic"], "qux");
495 }
496}