1#![doc = include_str!("../README.md")]
2
3use jiff::{Timestamp, Zoned};
4use rmp_serde::Serializer as MpSerializer;
5use serde::{Serialize, Serializer};
6use std::{
7 collections::HashMap,
8 fmt::{Debug, Display, Formatter, Write},
9 marker::PhantomData,
10 sync::{Arc, Mutex, mpsc},
11 thread::{sleep, spawn},
12 time::{Duration, SystemTime, UNIX_EPOCH},
13};
14use tracing_core::{
15 Event, Field, Level, Subscriber,
16 field::Visit,
17 span::{Attributes, Id, Record},
18};
19use tracing_subscriber::{
20 Layer,
21 layer::Context,
22 registry::{LookupSpan, Scope},
23};
24
25pub struct DataDogTraceLayer<S> {
43 buffer: Arc<Mutex<Vec<DataDogSpan>>>,
44 service: String,
45 env: String,
46 version: String,
47 logging_enabled: bool,
48 #[cfg(feature = "http")]
49 with_context: http::WithContext,
50 shutdown: mpsc::Sender<()>,
51 _registry: PhantomData<S>,
52}
53
54impl<S> DataDogTraceLayer<S>
55where
56 S: Subscriber + for<'a> LookupSpan<'a>,
57{
58 pub fn builder() -> DataDogTraceLayerBuilder<S> {
60 DataDogTraceLayerBuilder {
61 service: None,
62 env: None,
63 version: None,
64 agent_address: None,
65 container_id: None,
66 logging_enabled: false,
67 phantom_data: Default::default(),
68 }
69 }
70
71 #[cfg(feature = "http")]
72 fn get_context(
73 dispatch: &tracing_core::Dispatch,
74 id: &Id,
75 f: &mut dyn FnMut(&mut DataDogSpan),
76 ) {
77 let subscriber = dispatch
78 .downcast_ref::<S>()
79 .expect("Subscriber did not downcast to expected type, this is a bug");
80 let span = subscriber.span(id).expect("Span not found, this is a bug");
81
82 let mut extensions = span.extensions_mut();
83 if let Some(dd_span) = extensions.get_mut::<DataDogSpan>() {
84 f(dd_span);
85 }
86 }
87}
88
89impl<S> Drop for DataDogTraceLayer<S> {
90 fn drop(&mut self) {
91 let _ = self.shutdown.send(());
92 }
93}
94
95impl<S> Layer<S> for DataDogTraceLayer<S>
96where
97 S: Subscriber + for<'a> LookupSpan<'a>,
98{
99 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
100 let span = ctx.span(id).expect("Span not found, this is a bug");
101 let mut extensions = span.extensions_mut();
102
103 let trace_id = span
104 .parent()
105 .and_then(|parent| {
106 parent
107 .extensions()
108 .get::<DataDogSpan>()
109 .map(|dd_span| dd_span.trace_id)
110 })
111 .unwrap_or(rand::random());
112
113 let mut dd_span = DataDogSpan {
114 name: span.name().to_string(),
115 service: self.service.clone(),
116 r#type: "internal".into(),
117 span_id: span.id().into_u64(),
118 start: epoch_ns(),
119 parent_id: span
120 .parent()
121 .map(|parent| parent.id().into_u64())
122 .unwrap_or_default(),
123 trace_id,
124 meta: HashMap::from_iter([
125 ("env".into(), self.env.clone()),
126 ("version".into(), self.version.clone()),
127 ]),
128 ..Default::default()
129 };
130
131 attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
132
133 extensions.insert(dd_span);
134 }
135
136 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
137 let span = ctx.span(id).expect("Span not found, this is a bug");
138 let mut extensions = span.extensions_mut();
139
140 if let Some(dd_span) = extensions.get_mut::<DataDogSpan>() {
141 values.record(&mut SpanAttributeVisitor::new(dd_span));
142 }
143 }
144
145 fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
146 let span = ctx.span(id).expect("Span not found, this is a bug");
147 let mut extensions = span.extensions_mut();
148
149 if let Some(dd_span) = extensions.get_mut::<DataDogSpan>() {
150 dd_span.parent_id = follows.into_u64();
151 }
152 }
153
154 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
155 if !self.logging_enabled {
156 return;
157 }
158
159 let mut fields = {
160 let mut visitor = FieldVisitor::default();
161 event.record(&mut visitor);
162 visitor.fields
163 };
164
165 let mut message = fields.remove("message").unwrap_or_default();
166
167 fields.extend(
168 ctx.event_scope(event)
169 .into_iter()
170 .flat_map(Scope::from_root)
171 .flat_map(|span| match span.extensions().get::<DataDogSpan>() {
172 Some(dd_span) => dd_span.meta.clone(),
173 None => panic!("Span not found, this is a bug"),
174 }),
175 );
176
177 fields
178 .into_iter()
179 .try_for_each(|(k, v)| write!(&mut message, " {k}={v}"))
180 .expect("Failed to write message");
181
182 let (trace_id, span_id) = ctx
183 .lookup_current()
184 .and_then(|span| {
185 span.extensions()
186 .get::<DataDogSpan>()
187 .map(|dd_span| (Some(dd_span.trace_id), Some(dd_span.span_id)))
188 })
189 .unwrap_or_default();
190
191 let log = DataDogLog {
192 timestamp: Zoned::now().timestamp(),
193 level: event.metadata().level().to_owned(),
194 message,
195 trace_id,
196 span_id,
197 };
198
199 let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
200
201 println!("{serialized}");
202 }
203
204 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
205 let span = ctx.span(id).expect("Span not found, this is a bug");
206 let mut extensions = span.extensions_mut();
207
208 let now = epoch_ns();
209
210 match extensions.get_mut::<DataDogSpan>() {
211 Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
212 _ => {}
213 }
214 }
215
216 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
217 let span = ctx.span(id).expect("Span not found, this is a bug");
218 let mut extensions = span.extensions_mut();
219
220 let now = epoch_ns();
221
222 if let Some(dd_span) = extensions.get_mut::<DataDogSpan>() {
223 dd_span.duration = now - dd_span.start
224 }
225 }
226
227 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
228 let span = ctx.span(&id).expect("Span not found, this is a bug");
229 let mut extensions = span.extensions_mut();
230
231 if let Some(dd_span) = extensions.remove::<DataDogSpan>() {
232 self.buffer.lock().unwrap().push(dd_span);
233 }
234 }
235
236 #[cfg(feature = "http")]
239 unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
240 match id {
241 id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
242 id if id == std::any::TypeId::of::<http::WithContext>() => {
243 Some(&self.with_context as *const _ as *const ())
244 }
245 _ => None,
246 }
247 }
248}
249
250pub struct DataDogTraceLayerBuilder<S> {
252 service: Option<String>,
253 env: Option<String>,
254 version: Option<String>,
255 agent_address: Option<String>,
256 container_id: Option<String>,
257 logging_enabled: bool,
258 phantom_data: PhantomData<S>,
259}
260
261#[derive(Debug)]
263pub struct BuilderError(&'static str);
264
265impl Display for BuilderError {
266 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
267 f.write_str(self.0)
268 }
269}
270
271impl std::error::Error for BuilderError {}
272
273impl<S> DataDogTraceLayerBuilder<S>
274where
275 S: Subscriber + for<'a> LookupSpan<'a>,
276{
277 pub fn service(mut self, service: impl Into<String>) -> Self {
279 self.service = Some(service.into());
280 self
281 }
282
283 pub fn env(mut self, env: impl Into<String>) -> Self {
285 self.env = Some(env.into());
286 self
287 }
288
289 pub fn version(mut self, version: impl Into<String>) -> Self {
291 self.version = Some(version.into());
292 self
293 }
294
295 pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
297 self.agent_address = Some(agent_address.into());
298 self
299 }
300
301 pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
303 self.container_id = Some(container_id.into());
304 self
305 }
306
307 pub fn enable_logs(mut self, enable_logs: bool) -> Self {
310 self.logging_enabled = enable_logs;
311 self
312 }
313
314 pub fn build(self) -> Result<DataDogTraceLayer<S>, BuilderError> {
316 let Some(service) = self.service else {
317 return Err(BuilderError("service is required"));
318 };
319 let Some(env) = self.env else {
320 return Err(BuilderError("env is required"));
321 };
322 let Some(version) = self.version else {
323 return Err(BuilderError("version is required"));
324 };
325 let Some(agent_address) = self.agent_address else {
326 return Err(BuilderError("agent_address is required"));
327 };
328 let container_id = match self.container_id {
329 Some(s) => Some(
330 s.parse::<reqwest::header::HeaderValue>()
331 .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
332 ),
333 _ => None,
334 };
335
336 let buffer = Arc::new(Mutex::new(Vec::new()));
337 let exporter_buffer = buffer.clone();
338 let url = format!("http://{}/v0.4/traces", agent_address);
339 let (tx, rx) = mpsc::channel();
340
341 spawn(move || {
342 let client = {
343 let mut builder = reqwest::blocking::Client::builder();
344 if let Some(container_id) = container_id {
345 builder = builder.default_headers(reqwest::header::HeaderMap::from_iter([(
346 reqwest::header::HeaderName::from_static("Datadog-Container-ID"),
347 container_id,
348 )]));
349 };
350 builder.build().expect("Failed to build reqwest client")
351 };
352
353 loop {
354 if rx.try_recv().is_ok() {
355 break;
356 }
357
358 sleep(Duration::from_secs(5));
359
360 let spans = exporter_buffer
361 .lock()
362 .unwrap()
363 .drain(..)
364 .collect::<Vec<_>>();
365 if spans.is_empty() {
366 continue;
367 }
368
369 let mut body = vec![0b10010001];
370 let _ = spans
371 .serialize(&mut MpSerializer::new(&mut body).with_struct_map())
372 .inspect_err(|error| println!("Error serializing spans: {error:?}"));
373
374 let _ = client
375 .post(&url)
376 .header("Datadog-Meta-Tracer-Version", "v1.27.0")
377 .header("Content-Type", "application/msgpack")
378 .body(body)
379 .send()
380 .inspect_err(|error| println!("Error exporting spans: {error:?}"));
381 }
382 });
383
384 Ok(DataDogTraceLayer {
385 buffer,
386 service,
387 env,
388 version,
389 logging_enabled: self.logging_enabled,
390 #[cfg(feature = "http")]
391 with_context: http::WithContext(DataDogTraceLayer::<S>::get_context),
392 shutdown: tx,
393 _registry: PhantomData,
394 })
395 }
396}
397
398fn epoch_ns() -> i64 {
400 SystemTime::now()
401 .duration_since(UNIX_EPOCH)
402 .expect("SystemTime is before UNIX epoch")
403 .as_nanos() as i64
404}
405
406#[derive(Default, Debug, Serialize)]
408struct DataDogSpan {
409 name: String,
410 service: String,
411 r#type: String,
412 resource: String,
413 start: i64,
414 duration: i64,
415 meta: HashMap<String, String>,
416 error_code: i32,
417 span_id: u64,
418 trace_id: u64,
419 parent_id: u64,
420}
421
422struct SpanAttributeVisitor<'a> {
424 dd_span: &'a mut DataDogSpan,
425}
426
427impl<'a> SpanAttributeVisitor<'a> {
428 fn new(dd_span: &'a mut DataDogSpan) -> Self {
429 Self { dd_span }
430 }
431}
432
433impl<'a> Visit for SpanAttributeVisitor<'a> {
434 fn record_str(&mut self, field: &Field, value: &str) {
435 match field.name() {
437 "service" => self.dd_span.service = value.to_string(),
438 "span.type" => self.dd_span.r#type = value.to_string(),
439 "operation" => self.dd_span.name = value.to_string(),
440 "resource" => self.dd_span.resource = value.to_string(),
441 name => {
442 self.dd_span
443 .meta
444 .insert(name.to_string(), value.to_string());
445 }
446 };
447 }
448
449 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
450 match field.name() {
451 "service" => self.dd_span.service = format!("{value:?}"),
452 "span.type" => self.dd_span.r#type = format!("{value:?}"),
453 "operation" => self.dd_span.name = format!("{value:?}"),
454 "resource" => self.dd_span.resource = format!("{value:?}"),
455 name => {
456 self.dd_span
457 .meta
458 .insert(name.to_string(), format!("{value:?}"));
459 }
460 };
461 }
462}
463
464#[derive(Serialize)]
466struct DataDogLog {
467 timestamp: Timestamp,
468 #[serde(serialize_with = "serialize_level")]
469 level: Level,
470 message: String,
471 #[serde(rename = "dd.trace_id", skip_serializing_if = "Option::is_none")]
472 trace_id: Option<u64>,
473 #[serde(rename = "dd.span_id", skip_serializing_if = "Option::is_none")]
474 span_id: Option<u64>,
475}
476
477fn serialize_level<S: Serializer>(level: &Level, serializer: S) -> Result<S::Ok, S::Error> {
479 serializer.serialize_str(level.as_str())
480}
481
482#[derive(Default)]
484struct FieldVisitor {
485 fields: HashMap<String, String>,
486}
487
488impl Visit for FieldVisitor {
489 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
490 self.fields
491 .insert(field.name().to_string(), format!("{value:?}"));
492 }
493}
494
495#[cfg(feature = "http")]
496#[doc = "Functionality for working with distributed tracing HTTP headers"]
497pub mod http {
498 use crate::DataDogSpan;
499 use http::{HeaderMap, HeaderName};
500 use tracing_core::{Dispatch, span::Id};
501
502 #[derive(Copy, Clone, Default)]
505 pub struct DataDogContext {
506 trace_id: u128,
507 parent_id: u64,
508 }
509
510 impl DataDogContext {
511 pub fn from_w3c_headers(headers: &HeaderMap) -> Self {
536 Self::parse_w3c_headers(headers).unwrap_or_default()
537 }
538
539 fn parse_w3c_headers(headers: &HeaderMap) -> Option<Self> {
540 let header = headers.get("traceparent")?.to_str().ok()?;
541
542 let parts: Vec<&str> = header.split('-').collect();
543 if parts.len() != 4 {
544 return None;
545 }
546
547 let Some(0) = u8::from_str_radix(parts[0], 16).ok() else {
548 return None;
549 };
550
551 let trace_id = u128::from_str_radix(parts[1], 16).ok()?;
552 let parent_id = u64::from_str_radix(parts[2], 16).ok()?;
553
554 Some(Self {
555 trace_id,
556 parent_id,
557 })
558 }
559
560 pub fn to_w3c_headers(&self) -> HeaderMap {
576 let header = format!(
577 "{version:02x}-{trace_id:032x}-{parent_id:016x}-{trace_flags:02x}",
578 version = 0,
579 trace_id = self.trace_id,
580 parent_id = self.parent_id,
581 trace_flags = 1,
582 );
583
584 HeaderMap::from_iter([(
585 HeaderName::from_static("traceparent"),
586 header.parse().unwrap(),
587 )])
588 }
589 }
590
591 pub(crate) struct WithContext(
594 #[allow(clippy::type_complexity)]
595 pub(crate) fn(&Dispatch, &Id, f: &mut dyn FnMut(&mut DataDogSpan)),
596 );
597
598 impl WithContext {
599 pub(crate) fn with_context(
600 &self,
601 dispatch: &Dispatch,
602 id: &Id,
603 mut f: &mut dyn FnMut(&mut DataDogSpan),
604 ) {
605 self.0(dispatch, id, &mut f);
606 }
607 }
608
609 pub trait DistributedTracingContext {
610 fn get_context(&self) -> DataDogContext;
612
613 fn set_context(&self, context: DataDogContext);
615 }
616
617 impl DistributedTracingContext for tracing::Span {
618 fn get_context(&self) -> DataDogContext {
619 let mut ctx = None;
620
621 self.with_subscriber(|(id, subscriber)| {
622 let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
623 return;
624 };
625 get_context.with_context(subscriber, id, &mut |dd_span| {
626 ctx = Some(DataDogContext {
627 trace_id: dd_span.trace_id as u128,
629 parent_id: dd_span.parent_id,
630 })
631 });
632 });
633
634 ctx.unwrap_or_default()
635 }
636
637 fn set_context(&self, context: DataDogContext) {
638 self.with_subscriber(move |(id, subscriber)| {
639 let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
640 return;
641 };
642 get_context.with_context(subscriber, id, &mut |dd_span| {
643 dd_span.trace_id = context.trace_id as u64;
645 dd_span.parent_id = context.parent_id;
646 })
647 });
648 }
649 }
650
651 #[cfg(test)]
652 mod tests {
653 use super::*;
654 use crate::DataDogTraceLayer;
655 use rand::random;
656 use tracing::info_span;
657 use tracing_subscriber::layer::SubscriberExt;
658
659 #[test]
660 fn w3c_trace_header_round_trip() {
661 let context = DataDogContext {
662 trace_id: random(),
663 parent_id: random(),
664 };
665
666 let headers = context.to_w3c_headers();
667 let parsed = DataDogContext::parse_w3c_headers(&headers).unwrap();
668
669 assert_eq!(context.trace_id, parsed.trace_id);
670 assert_eq!(context.parent_id, parsed.parent_id);
671 }
672
673 #[test]
674 fn span_context_round_trip() {
675 tracing::subscriber::with_default(
676 tracing_subscriber::registry().with(
677 DataDogTraceLayer::builder()
678 .service("test-service")
679 .env("test")
680 .version("test-version")
681 .agent_address("localhost:8126")
682 .build()
683 .unwrap(),
684 ),
685 || {
686 let context = DataDogContext {
687 trace_id: random::<u64>() as u128,
689 parent_id: random(),
690 };
691
692 let span = info_span!("test");
693
694 span.set_context(context);
695 let result = span.get_context();
696
697 assert_eq!(context.trace_id, result.trace_id);
698 assert_eq!(context.parent_id, result.parent_id);
699 },
700 );
701 }
702 }
703}