1#![doc = include_str!("../README.md")]
2
3use jiff::{Timestamp, Zoned};
4use rmp_serde::Serializer as MpSerializer;
5use serde::{Serialize, Serializer};
6use std::{
7 collections::{BTreeMap, HashMap},
8 fmt::{Debug, Display, Formatter, Write},
9 marker::PhantomData,
10 ops::DerefMut,
11 sync::{Arc, Mutex, mpsc},
12 thread::{sleep, spawn},
13 time::{Duration, SystemTime, UNIX_EPOCH},
14};
15use tracing_core::{
16 Event, Field, Level, Subscriber,
17 field::Visit,
18 span::{Attributes, Id, Record},
19};
20use tracing_subscriber::{
21 Layer,
22 layer::Context,
23 registry::{LookupSpan, Scope},
24};
25
26#[derive(Debug)]
44pub struct DatadogTraceLayer<S> {
45 buffer: Arc<Mutex<Vec<DatadogSpan>>>,
46 service: String,
47 default_tags: HashMap<String, String>,
48 logging_enabled: bool,
49 #[cfg(feature = "http")]
50 with_context: http::WithContext,
51 shutdown: mpsc::Sender<()>,
52 _registry: PhantomData<S>,
53}
54
55impl<S> DatadogTraceLayer<S>
56where
57 S: Subscriber + for<'a> LookupSpan<'a>,
58{
59 pub fn builder() -> DatadogTraceLayerBuilder<S> {
61 DatadogTraceLayerBuilder {
62 service: None,
63 default_tags: HashMap::new(),
64 agent_address: None,
65 container_id: None,
66 logging_enabled: false,
67 phantom_data: Default::default(),
68 }
69 }
70
71 #[cfg(feature = "http")]
72 fn get_context(
73 dispatch: &tracing_core::Dispatch,
74 id: &Id,
75 f: &mut dyn FnMut(&mut DatadogSpan),
76 ) {
77 let subscriber = dispatch
78 .downcast_ref::<S>()
79 .expect("Subscriber did not downcast to expected type, this is a bug");
80 let span = subscriber.span(id).expect("Span not found, this is a bug");
81
82 let mut extensions = span.extensions_mut();
83 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
84 f(dd_span);
85 }
86 }
87}
88
89impl<S> Drop for DatadogTraceLayer<S> {
90 fn drop(&mut self) {
91 let _ = self.shutdown.send(());
92 }
93}
94
95impl<S> Layer<S> for DatadogTraceLayer<S>
96where
97 S: Subscriber + for<'a> LookupSpan<'a>,
98{
99 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
100 let span = ctx.span(id).expect("Span not found, this is a bug");
101 let mut extensions = span.extensions_mut();
102
103 let trace_id = span
104 .parent()
105 .map(|parent| {
106 parent
107 .extensions()
108 .get::<DatadogSpan>()
109 .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
110 .trace_id
111 })
112 .unwrap_or(rand::random_range(1..=u64::MAX));
113
114 debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
115
116 let mut dd_span = DatadogSpan {
117 name: span.name().to_string(),
118 service: self.service.clone(),
119 r#type: "internal".into(),
120 span_id: span.id().into_u64(),
121 start: epoch_ns(),
122 parent_id: span
123 .parent()
124 .map(|parent| parent.id().into_u64())
125 .unwrap_or_default(),
126 trace_id,
127 meta: self.default_tags.clone(),
128 ..Default::default()
129 };
130
131 attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
132
133 extensions.insert(dd_span);
134 }
135
136 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
137 let span = ctx.span(id).expect("Span not found, this is a bug");
138 let mut extensions = span.extensions_mut();
139
140 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
141 values.record(&mut SpanAttributeVisitor::new(dd_span));
142 }
143 }
144
145 fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
146 let span = ctx.span(id).expect("Span not found, this is a bug");
147 let mut extensions = span.extensions_mut();
148
149 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
150 dd_span.parent_id = follows.into_u64();
151 }
152 }
153
154 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
155 if !self.logging_enabled {
156 return;
157 }
158
159 let mut fields = {
160 let mut visitor = FieldVisitor::default();
161 event.record(&mut visitor);
162 visitor.fields
163 };
164
165 let mut message = fields.remove("message").unwrap_or_default();
166
167 fields.extend(
168 ctx.event_scope(event)
169 .into_iter()
170 .flat_map(Scope::from_root)
171 .flat_map(|span| match span.extensions().get::<DatadogSpan>() {
172 Some(dd_span) => dd_span.meta.clone(),
173 None => panic!("DatadogSpan extension not found, this is a bug"),
174 }),
175 );
176
177 fields
178 .into_iter()
179 .try_for_each(|(k, v)| write!(&mut message, " {k}={v}"))
180 .expect("Failed to write log message");
181
182 let (trace_id, span_id) = ctx
183 .lookup_current()
184 .and_then(|span| {
185 span.extensions()
186 .get::<DatadogSpan>()
187 .map(|dd_span| (Some(dd_span.trace_id), Some(dd_span.span_id)))
188 })
189 .unwrap_or_default();
190
191 let log = DatadogLog {
192 timestamp: Zoned::now().timestamp(),
193 level: event.metadata().level().to_owned(),
194 message,
195 trace_id,
196 span_id,
197 };
198
199 let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
200
201 println!("{serialized}");
202 }
203
204 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
205 let span = ctx.span(id).expect("Span not found, this is a bug");
206 let mut extensions = span.extensions_mut();
207
208 let now = epoch_ns();
209
210 match extensions.get_mut::<DatadogSpan>() {
211 Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
212 _ => {}
213 }
214 }
215
216 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
217 let span = ctx.span(id).expect("Span not found, this is a bug");
218 let mut extensions = span.extensions_mut();
219
220 let now = epoch_ns();
221
222 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
223 dd_span.duration = now - dd_span.start
224 }
225 }
226
227 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
228 let span = ctx.span(&id).expect("Span not found, this is a bug");
229 let mut extensions = span.extensions_mut();
230
231 if let Some(dd_span) = extensions.remove::<DatadogSpan>() {
232 self.buffer.lock().unwrap().push(dd_span);
233 }
234 }
235
236 #[cfg(feature = "http")]
239 unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
240 match id {
241 id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
242 id if id == std::any::TypeId::of::<http::WithContext>() => {
243 Some(&self.with_context as *const _ as *const ())
244 }
245 _ => None,
246 }
247 }
248}
249
250pub struct DatadogTraceLayerBuilder<S> {
252 service: Option<String>,
253 default_tags: HashMap<String, String>,
254 agent_address: Option<String>,
255 container_id: Option<String>,
256 logging_enabled: bool,
257 phantom_data: PhantomData<S>,
258}
259
260#[derive(Debug)]
262pub struct BuilderError(&'static str);
263
264impl Display for BuilderError {
265 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
266 f.write_str(self.0)
267 }
268}
269
270impl std::error::Error for BuilderError {}
271
272impl<S> DatadogTraceLayerBuilder<S>
273where
274 S: Subscriber + for<'a> LookupSpan<'a>,
275{
276 pub fn service(mut self, service: impl Into<String>) -> Self {
278 self.service = Some(service.into());
279 self
280 }
281
282 pub fn env(mut self, env: impl Into<String>) -> Self {
284 self.default_tags.insert("env".into(), env.into());
285 self
286 }
287
288 pub fn version(mut self, version: impl Into<String>) -> Self {
290 self.default_tags.insert("version".into(), version.into());
291 self
292 }
293
294 pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
296 self.agent_address = Some(agent_address.into());
297 self
298 }
299
300 pub fn default_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
306 let _ = self.default_tags.insert(key.into(), value.into());
307 self
308 }
309
310 pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
312 self.container_id = Some(container_id.into());
313 self
314 }
315
316 pub fn enable_logs(mut self, enable_logs: bool) -> Self {
319 self.logging_enabled = enable_logs;
320 self
321 }
322
323 pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
325 let Some(service) = self.service else {
326 return Err(BuilderError("service is required"));
327 };
328 if !self.default_tags.contains_key("env") {
329 return Err(BuilderError("env is required"));
330 };
331 if !self.default_tags.contains_key("version") {
332 return Err(BuilderError("version is required"));
333 };
334 let Some(agent_address) = self.agent_address else {
335 return Err(BuilderError("agent_address is required"));
336 };
337 let container_id = match self.container_id {
338 Some(s) => Some(
339 s.parse::<reqwest::header::HeaderValue>()
340 .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
341 ),
342 _ => None,
343 };
344
345 let buffer = Arc::new(Mutex::new(Vec::new()));
346 let exporter_buffer = buffer.clone();
347 let url = format!("http://{}/v0.4/traces", agent_address);
348 let (tx, rx) = mpsc::channel();
349
350 spawn(move || {
351 let client = {
352 let mut builder = reqwest::blocking::Client::builder();
353 if let Some(container_id) = container_id {
354 builder = builder.default_headers(reqwest::header::HeaderMap::from_iter([(
355 reqwest::header::HeaderName::from_static("datadog-container-id"),
356 container_id,
357 )]));
358 };
359 builder.build().expect("Failed to build reqwest client")
360 };
361 let mut spans = Vec::new();
362
363 loop {
364 if rx.try_recv().is_ok() {
365 break;
366 }
367
368 sleep(Duration::from_secs(5));
369
370 std::mem::swap(&mut spans, exporter_buffer.lock().unwrap().deref_mut());
371
372 if spans.is_empty() {
373 continue;
374 }
375
376 let mut body = vec![0b10010001];
377 let _ = spans
378 .serialize(&mut MpSerializer::new(&mut body).with_struct_map())
379 .inspect_err(|error| println!("Error serializing spans: {error:?}"));
380
381 spans.clear();
382
383 let _ = client
384 .post(&url)
385 .header("Datadog-Meta-Tracer-Version", "v1.27.0")
386 .header("Content-Type", "application/msgpack")
387 .body(body)
388 .send()
389 .inspect_err(|error| println!("Error exporting spans: {error:?}"));
390 }
391 });
392
393 Ok(DatadogTraceLayer {
394 buffer,
395 service,
396 default_tags: self.default_tags,
397 logging_enabled: self.logging_enabled,
398 #[cfg(feature = "http")]
399 with_context: http::WithContext(DatadogTraceLayer::<S>::get_context),
400 shutdown: tx,
401 _registry: PhantomData,
402 })
403 }
404}
405
406fn epoch_ns() -> i64 {
408 SystemTime::now()
409 .duration_since(UNIX_EPOCH)
410 .expect("SystemTime is before UNIX epoch")
411 .as_nanos() as i64
412}
413
414#[derive(Default, Debug, Serialize)]
416struct DatadogSpan {
417 trace_id: u64,
418 span_id: u64,
419 parent_id: u64,
420 start: i64,
421 duration: i64,
422 name: String,
424 service: String,
425 r#type: String,
426 resource: String,
427 meta: HashMap<String, String>,
428 error_code: i32,
429}
430
431struct SpanAttributeVisitor<'a> {
433 dd_span: &'a mut DatadogSpan,
434}
435
436impl<'a> SpanAttributeVisitor<'a> {
437 fn new(dd_span: &'a mut DatadogSpan) -> Self {
438 Self { dd_span }
439 }
440}
441
442impl<'a> Visit for SpanAttributeVisitor<'a> {
443 fn record_str(&mut self, field: &Field, value: &str) {
444 match field.name() {
446 "service" => self.dd_span.service = value.to_string(),
447 "span.type" => self.dd_span.r#type = value.to_string(),
448 "operation" => self.dd_span.name = value.to_string(),
449 "resource" => self.dd_span.resource = value.to_string(),
450 name => {
451 self.dd_span
452 .meta
453 .insert(name.to_string(), value.to_string());
454 }
455 };
456 }
457
458 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
459 match field.name() {
460 "service" => self.dd_span.service = format!("{value:?}"),
461 "span.type" => self.dd_span.r#type = format!("{value:?}"),
462 "operation" => self.dd_span.name = format!("{value:?}"),
463 "resource" => self.dd_span.resource = format!("{value:?}"),
464 name => {
465 self.dd_span
466 .meta
467 .insert(name.to_string(), format!("{value:?}"));
468 }
469 };
470 }
471}
472
473#[derive(Serialize)]
475struct DatadogLog {
476 timestamp: Timestamp,
477 #[serde(serialize_with = "serialize_level")]
478 level: Level,
479 message: String,
480 #[serde(rename = "dd.trace_id", skip_serializing_if = "Option::is_none")]
481 trace_id: Option<u64>,
482 #[serde(rename = "dd.span_id", skip_serializing_if = "Option::is_none")]
483 span_id: Option<u64>,
484}
485
486fn serialize_level<S: Serializer>(level: &Level, serializer: S) -> Result<S::Ok, S::Error> {
488 serializer.serialize_str(level.as_str())
489}
490
491#[derive(Default)]
493struct FieldVisitor {
494 fields: BTreeMap<String, String>,
495}
496
497impl Visit for FieldVisitor {
498 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
499 self.fields
500 .insert(field.name().to_string(), format!("{value:?}"));
501 }
502}
503
504#[cfg(feature = "http")]
505#[doc = "Functionality for working with distributed tracing HTTP headers"]
506pub mod http {
507 use crate::DatadogSpan;
508 use http::{HeaderMap, HeaderName};
509 use tracing_core::{Dispatch, span::Id};
510
511 #[derive(Copy, Clone, Default)]
514 pub struct DatadogContext {
515 trace_id: u128,
516 parent_id: u64,
517 }
518
519 impl DatadogContext {
520 pub fn from_w3c_headers(headers: &HeaderMap) -> Self {
545 Self::parse_w3c_headers(headers).unwrap_or_default()
546 }
547
548 fn parse_w3c_headers(headers: &HeaderMap) -> Option<Self> {
549 let header = headers.get("traceparent")?.to_str().ok()?;
550
551 let parts: Vec<&str> = header.split('-').collect();
552 if parts.len() != 4 {
553 return None;
554 }
555
556 let Some(0) = u8::from_str_radix(parts[0], 16).ok() else {
557 return None;
558 };
559
560 let trace_id = u128::from_str_radix(parts[1], 16).ok()?;
561 let parent_id = u64::from_str_radix(parts[2], 16).ok()?;
562
563 Some(Self {
564 trace_id,
565 parent_id,
566 })
567 }
568
569 pub fn to_w3c_headers(&self) -> HeaderMap {
585 if self.is_empty() {
586 return Default::default();
587 }
588
589 let header = format!(
590 "{version:02x}-{trace_id:032x}-{parent_id:016x}-{trace_flags:02x}",
591 version = 0,
592 trace_id = self.trace_id,
593 parent_id = self.parent_id,
594 trace_flags = 1,
595 );
596
597 HeaderMap::from_iter([(
598 HeaderName::from_static("traceparent"),
599 header.parse().unwrap(),
600 )])
601 }
602
603 fn is_empty(&self) -> bool {
606 self.trace_id == 0 || self.parent_id == 0
607 }
608 }
609
610 #[derive(Debug)]
613 pub(crate) struct WithContext(
614 #[allow(clippy::type_complexity)]
615 pub(crate) fn(&Dispatch, &Id, f: &mut dyn FnMut(&mut DatadogSpan)),
616 );
617
618 impl WithContext {
619 pub(crate) fn with_context(
620 &self,
621 dispatch: &Dispatch,
622 id: &Id,
623 mut f: &mut dyn FnMut(&mut DatadogSpan),
624 ) {
625 self.0(dispatch, id, &mut f);
626 }
627 }
628
629 pub trait DistributedTracingContext {
630 fn get_context(&self) -> DatadogContext;
632
633 fn set_context(&self, context: DatadogContext);
635 }
636
637 impl DistributedTracingContext for tracing::Span {
638 fn get_context(&self) -> DatadogContext {
639 let mut ctx = None;
640
641 self.with_subscriber(|(id, subscriber)| {
642 let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
643 return;
644 };
645 get_context.with_context(subscriber, id, &mut |dd_span| {
646 ctx = Some(DatadogContext {
647 trace_id: dd_span.trace_id as u128,
649 parent_id: dd_span.parent_id,
650 })
651 });
652 });
653
654 ctx.unwrap_or_default()
655 }
656
657 fn set_context(&self, context: DatadogContext) {
658 if context.is_empty() {
660 return;
661 }
662
663 self.with_subscriber(move |(id, subscriber)| {
664 let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
665 return;
666 };
667 get_context.with_context(subscriber, id, &mut |dd_span| {
668 dd_span.trace_id = context.trace_id as u64;
670 dd_span.parent_id = context.parent_id;
671 })
672 });
673 }
674 }
675
676 #[cfg(test)]
677 mod tests {
678 use super::*;
679 use crate::DatadogTraceLayer;
680 use rand::random_range;
681 use tracing::info_span;
682 use tracing_subscriber::layer::SubscriberExt;
683
684 #[test]
685 fn w3c_trace_header_round_trip() {
686 let context = DatadogContext {
687 trace_id: random_range(1..=u128::MAX),
688 parent_id: random_range(1..=u64::MAX),
689 };
690
691 let headers = context.to_w3c_headers();
692 let parsed = DatadogContext::parse_w3c_headers(&headers).unwrap();
693
694 assert_eq!(context.trace_id, parsed.trace_id);
695 assert_eq!(context.parent_id, parsed.parent_id);
696 }
697
698 #[test]
699 fn empty_context_doesnt_produce_w3c_trace_header() {
700 assert!(DatadogContext::default().to_w3c_headers().is_empty());
701 }
702
703 #[test]
704 fn span_context_round_trip() {
705 tracing::subscriber::with_default(
706 tracing_subscriber::registry().with(
707 DatadogTraceLayer::builder()
708 .service("test-service")
709 .env("test")
710 .version("test-version")
711 .agent_address("localhost:8126")
712 .build()
713 .unwrap(),
714 ),
715 || {
716 let context = DatadogContext {
717 trace_id: random_range(1..=u64::MAX) as u128,
719 parent_id: random_range(1..=u64::MAX),
720 };
721
722 let span = info_span!("test");
723
724 span.set_context(context);
725 let result = span.get_context();
726
727 assert_eq!(context.trace_id, result.trace_id);
728 assert_eq!(context.parent_id, result.parent_id);
729 },
730 );
731 }
732
733 #[test]
734 fn empty_span_context_does_not_erase_ids() {
735 tracing::subscriber::with_default(
736 tracing_subscriber::registry().with(
737 DatadogTraceLayer::builder()
738 .service("test-service")
739 .env("test")
740 .version("test-version")
741 .agent_address("localhost:8126")
742 .build()
743 .unwrap(),
744 ),
745 || {
746 let context = DatadogContext::default();
747
748 let span = info_span!("test");
749
750 span.set_context(context);
751 let result = span.get_context();
752
753 assert_ne!(result.trace_id, 0);
754 assert_eq!(result.parent_id, 0);
755 },
756 );
757 }
758 }
759}
760
761#[cfg(test)]
762mod tests {
763 use super::*;
764
765 #[test]
766 fn builder_builds_successfully() {
767 assert!(
768 DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
769 .service("test-service")
770 .env("test")
771 .version("test-version")
772 .agent_address("localhost:8126")
773 .build()
774 .is_ok()
775 );
776 }
777
778 #[test]
779 fn service_is_required() {
780 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
781 .env("test")
782 .version("test-version")
783 .agent_address("localhost:8126")
784 .build();
785 assert!(result.unwrap_err().to_string().contains("service"));
786 }
787
788 #[test]
789 fn env_is_required() {
790 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
791 .service("test-service")
792 .version("test-version")
793 .agent_address("localhost:8126")
794 .build();
795 assert!(result.unwrap_err().to_string().contains("env"));
796 }
797
798 #[test]
799 fn version_is_required() {
800 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
801 .service("test-service")
802 .env("test")
803 .agent_address("localhost:8126")
804 .build();
805 assert!(result.unwrap_err().to_string().contains("version"));
806 }
807
808 #[test]
809 fn agent_address_is_required() {
810 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
811 .service("test-service")
812 .env("test")
813 .version("test-version")
814 .build();
815 assert!(result.unwrap_err().to_string().contains("agent_address"));
816 }
817
818 #[test]
819 fn default_default_tags_include_env_and_version() {
820 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
821 .service("test-service")
822 .env("test")
823 .version("test-version")
824 .agent_address("localhost:8126")
825 .build()
826 .unwrap();
827 let default_tags = &layer.default_tags;
828 assert_eq!(default_tags["env"], "test");
829 assert_eq!(default_tags["version"], "test-version");
830 }
831
832 #[test]
833 fn default_tags_can_be_added() {
834 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
835 .service("test-service")
836 .env("test")
837 .version("test-version")
838 .agent_address("localhost:8126")
839 .default_tag("foo", "bar")
840 .default_tag("baz", "qux")
841 .build()
842 .unwrap();
843 let default_tags = &layer.default_tags;
844 assert_eq!(default_tags["foo"], "bar");
845 assert_eq!(default_tags["baz"], "qux");
846 }
847}