1#![doc = include_str!("../README.md")]
2
3use jiff::{Timestamp, Zoned};
4use rmp_serde::Serializer as MpSerializer;
5use serde::{Serialize, Serializer};
6use std::{
7 collections::HashMap,
8 fmt::{Debug, Display, Formatter, Write},
9 marker::PhantomData,
10 sync::{Arc, Mutex, mpsc},
11 thread::{sleep, spawn},
12 time::{Duration, SystemTime, UNIX_EPOCH},
13};
14use tracing_core::{
15 Event, Field, Level, Subscriber,
16 field::Visit,
17 span::{Attributes, Id, Record},
18};
19use tracing_subscriber::{
20 Layer,
21 layer::Context,
22 registry::{LookupSpan, Scope},
23};
24
25#[derive(Debug)]
43pub struct DatadogTraceLayer<S> {
44 buffer: Arc<Mutex<Vec<DatadogSpan>>>,
45 service: String,
46 default_tags: HashMap<String, String>,
47 logging_enabled: bool,
48 #[cfg(feature = "http")]
49 with_context: http::WithContext,
50 shutdown: mpsc::Sender<()>,
51 _registry: PhantomData<S>,
52}
53
54impl<S> DatadogTraceLayer<S>
55where
56 S: Subscriber + for<'a> LookupSpan<'a>,
57{
58 pub fn builder() -> DatadogTraceLayerBuilder<S> {
60 DatadogTraceLayerBuilder {
61 service: None,
62 default_tags: HashMap::new(),
63 agent_address: None,
64 container_id: None,
65 logging_enabled: false,
66 phantom_data: Default::default(),
67 }
68 }
69
70 #[cfg(feature = "http")]
71 fn get_context(
72 dispatch: &tracing_core::Dispatch,
73 id: &Id,
74 f: &mut dyn FnMut(&mut DatadogSpan),
75 ) {
76 let subscriber = dispatch
77 .downcast_ref::<S>()
78 .expect("Subscriber did not downcast to expected type, this is a bug");
79 let span = subscriber.span(id).expect("Span not found, this is a bug");
80
81 let mut extensions = span.extensions_mut();
82 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
83 f(dd_span);
84 }
85 }
86}
87
88impl<S> Drop for DatadogTraceLayer<S> {
89 fn drop(&mut self) {
90 let _ = self.shutdown.send(());
91 }
92}
93
94impl<S> Layer<S> for DatadogTraceLayer<S>
95where
96 S: Subscriber + for<'a> LookupSpan<'a>,
97{
98 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
99 let span = ctx.span(id).expect("Span not found, this is a bug");
100 let mut extensions = span.extensions_mut();
101
102 let trace_id = span
103 .parent()
104 .map(|parent| {
105 parent
106 .extensions()
107 .get::<DatadogSpan>()
108 .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
109 .trace_id
110 })
111 .unwrap_or(rand::random_range(1..=u64::MAX));
112
113 debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
114
115 let mut dd_span = DatadogSpan {
116 name: span.name().to_string(),
117 service: self.service.clone(),
118 r#type: "internal".into(),
119 span_id: span.id().into_u64(),
120 start: epoch_ns(),
121 parent_id: span
122 .parent()
123 .map(|parent| parent.id().into_u64())
124 .unwrap_or_default(),
125 trace_id,
126 meta: self.default_tags.clone(),
127 ..Default::default()
128 };
129
130 attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
131
132 extensions.insert(dd_span);
133 }
134
135 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
136 let span = ctx.span(id).expect("Span not found, this is a bug");
137 let mut extensions = span.extensions_mut();
138
139 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
140 values.record(&mut SpanAttributeVisitor::new(dd_span));
141 }
142 }
143
144 fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
145 let span = ctx.span(id).expect("Span not found, this is a bug");
146 let mut extensions = span.extensions_mut();
147
148 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
149 dd_span.parent_id = follows.into_u64();
150 }
151 }
152
153 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
154 if !self.logging_enabled {
155 return;
156 }
157
158 let mut fields = {
159 let mut visitor = FieldVisitor::default();
160 event.record(&mut visitor);
161 visitor.fields
162 };
163
164 let mut message = fields.remove("message").unwrap_or_default();
165
166 fields.extend(
167 ctx.event_scope(event)
168 .into_iter()
169 .flat_map(Scope::from_root)
170 .flat_map(|span| match span.extensions().get::<DatadogSpan>() {
171 Some(dd_span) => dd_span.meta.clone(),
172 None => panic!("DatadogSpan extension not found, this is a bug"),
173 }),
174 );
175
176 fields
177 .into_iter()
178 .try_for_each(|(k, v)| write!(&mut message, " {k}={v}"))
179 .expect("Failed to write message");
180
181 let (trace_id, span_id) = ctx
182 .lookup_current()
183 .and_then(|span| {
184 span.extensions()
185 .get::<DatadogSpan>()
186 .map(|dd_span| (Some(dd_span.trace_id), Some(dd_span.span_id)))
187 })
188 .unwrap_or_default();
189
190 let log = DatadogLog {
191 timestamp: Zoned::now().timestamp(),
192 level: event.metadata().level().to_owned(),
193 message,
194 trace_id,
195 span_id,
196 };
197
198 let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
199
200 println!("{serialized}");
201 }
202
203 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
204 let span = ctx.span(id).expect("Span not found, this is a bug");
205 let mut extensions = span.extensions_mut();
206
207 let now = epoch_ns();
208
209 match extensions.get_mut::<DatadogSpan>() {
210 Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
211 _ => {}
212 }
213 }
214
215 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
216 let span = ctx.span(id).expect("Span not found, this is a bug");
217 let mut extensions = span.extensions_mut();
218
219 let now = epoch_ns();
220
221 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
222 dd_span.duration = now - dd_span.start
223 }
224 }
225
226 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
227 let span = ctx.span(&id).expect("Span not found, this is a bug");
228 let mut extensions = span.extensions_mut();
229
230 if let Some(dd_span) = extensions.remove::<DatadogSpan>() {
231 self.buffer.lock().unwrap().push(dd_span);
232 }
233 }
234
235 #[cfg(feature = "http")]
238 unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
239 match id {
240 id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
241 id if id == std::any::TypeId::of::<http::WithContext>() => {
242 Some(&self.with_context as *const _ as *const ())
243 }
244 _ => None,
245 }
246 }
247}
248
249pub struct DatadogTraceLayerBuilder<S> {
251 service: Option<String>,
252 default_tags: HashMap<String, String>,
253 agent_address: Option<String>,
254 container_id: Option<String>,
255 logging_enabled: bool,
256 phantom_data: PhantomData<S>,
257}
258
259#[derive(Debug)]
261pub struct BuilderError(&'static str);
262
263impl Display for BuilderError {
264 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
265 f.write_str(self.0)
266 }
267}
268
269impl std::error::Error for BuilderError {}
270
271impl<S> DatadogTraceLayerBuilder<S>
272where
273 S: Subscriber + for<'a> LookupSpan<'a>,
274{
275 pub fn service(mut self, service: impl Into<String>) -> Self {
277 self.service = Some(service.into());
278 self
279 }
280
281 pub fn env(mut self, env: impl Into<String>) -> Self {
283 self.default_tags.insert("env".into(), env.into());
284 self
285 }
286
287 pub fn version(mut self, version: impl Into<String>) -> Self {
289 self.default_tags.insert("version".into(), version.into());
290 self
291 }
292
293 pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
295 self.agent_address = Some(agent_address.into());
296 self
297 }
298
299 pub fn default_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
305 let _ = self.default_tags.insert(key.into(), value.into());
306 self
307 }
308
309 pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
311 self.container_id = Some(container_id.into());
312 self
313 }
314
315 pub fn enable_logs(mut self, enable_logs: bool) -> Self {
318 self.logging_enabled = enable_logs;
319 self
320 }
321
322 pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
324 let Some(service) = self.service else {
325 return Err(BuilderError("service is required"));
326 };
327 if !self.default_tags.contains_key("env") {
328 return Err(BuilderError("env is required"));
329 };
330 if !self.default_tags.contains_key("version") {
331 return Err(BuilderError("version is required"));
332 };
333 let Some(agent_address) = self.agent_address else {
334 return Err(BuilderError("agent_address is required"));
335 };
336 let container_id = match self.container_id {
337 Some(s) => Some(
338 s.parse::<reqwest::header::HeaderValue>()
339 .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
340 ),
341 _ => None,
342 };
343
344 let buffer = Arc::new(Mutex::new(Vec::new()));
345 let exporter_buffer = buffer.clone();
346 let url = format!("http://{}/v0.4/traces", agent_address);
347 let (tx, rx) = mpsc::channel();
348
349 spawn(move || {
350 let client = {
351 let mut builder = reqwest::blocking::Client::builder();
352 if let Some(container_id) = container_id {
353 builder = builder.default_headers(reqwest::header::HeaderMap::from_iter([(
354 reqwest::header::HeaderName::from_static("Datadog-Container-ID"),
355 container_id,
356 )]));
357 };
358 builder.build().expect("Failed to build reqwest client")
359 };
360
361 loop {
362 if rx.try_recv().is_ok() {
363 break;
364 }
365
366 sleep(Duration::from_secs(5));
367
368 let spans = exporter_buffer
369 .lock()
370 .unwrap()
371 .drain(..)
372 .collect::<Vec<_>>();
373 if spans.is_empty() {
374 continue;
375 }
376
377 let mut body = vec![0b10010001];
378 let _ = spans
379 .serialize(&mut MpSerializer::new(&mut body).with_struct_map())
380 .inspect_err(|error| println!("Error serializing spans: {error:?}"));
381
382 let _ = client
383 .post(&url)
384 .header("Datadog-Meta-Tracer-Version", "v1.27.0")
385 .header("Content-Type", "application/msgpack")
386 .body(body)
387 .send()
388 .inspect_err(|error| println!("Error exporting spans: {error:?}"));
389 }
390 });
391
392 Ok(DatadogTraceLayer {
393 buffer,
394 service,
395 default_tags: self.default_tags,
396 logging_enabled: self.logging_enabled,
397 #[cfg(feature = "http")]
398 with_context: http::WithContext(DatadogTraceLayer::<S>::get_context),
399 shutdown: tx,
400 _registry: PhantomData,
401 })
402 }
403}
404
405fn epoch_ns() -> i64 {
407 SystemTime::now()
408 .duration_since(UNIX_EPOCH)
409 .expect("SystemTime is before UNIX epoch")
410 .as_nanos() as i64
411}
412
413#[derive(Default, Debug, Serialize)]
415struct DatadogSpan {
416 trace_id: u64,
417 span_id: u64,
418 parent_id: u64,
419 start: i64,
420 duration: i64,
421 name: String,
423 service: String,
424 r#type: String,
425 resource: String,
426 meta: HashMap<String, String>,
427 error_code: i32,
428}
429
430struct SpanAttributeVisitor<'a> {
432 dd_span: &'a mut DatadogSpan,
433}
434
435impl<'a> SpanAttributeVisitor<'a> {
436 fn new(dd_span: &'a mut DatadogSpan) -> Self {
437 Self { dd_span }
438 }
439}
440
441impl<'a> Visit for SpanAttributeVisitor<'a> {
442 fn record_str(&mut self, field: &Field, value: &str) {
443 match field.name() {
445 "service" => self.dd_span.service = value.to_string(),
446 "span.type" => self.dd_span.r#type = value.to_string(),
447 "operation" => self.dd_span.name = value.to_string(),
448 "resource" => self.dd_span.resource = value.to_string(),
449 name => {
450 self.dd_span
451 .meta
452 .insert(name.to_string(), value.to_string());
453 }
454 };
455 }
456
457 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
458 match field.name() {
459 "service" => self.dd_span.service = format!("{value:?}"),
460 "span.type" => self.dd_span.r#type = format!("{value:?}"),
461 "operation" => self.dd_span.name = format!("{value:?}"),
462 "resource" => self.dd_span.resource = format!("{value:?}"),
463 name => {
464 self.dd_span
465 .meta
466 .insert(name.to_string(), format!("{value:?}"));
467 }
468 };
469 }
470}
471
472#[derive(Serialize)]
474struct DatadogLog {
475 timestamp: Timestamp,
476 #[serde(serialize_with = "serialize_level")]
477 level: Level,
478 message: String,
479 #[serde(rename = "dd.trace_id", skip_serializing_if = "Option::is_none")]
480 trace_id: Option<u64>,
481 #[serde(rename = "dd.span_id", skip_serializing_if = "Option::is_none")]
482 span_id: Option<u64>,
483}
484
485fn serialize_level<S: Serializer>(level: &Level, serializer: S) -> Result<S::Ok, S::Error> {
487 serializer.serialize_str(level.as_str())
488}
489
490#[derive(Default)]
492struct FieldVisitor {
493 fields: HashMap<String, String>,
494}
495
496impl Visit for FieldVisitor {
497 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
498 self.fields
499 .insert(field.name().to_string(), format!("{value:?}"));
500 }
501}
502
503#[cfg(feature = "http")]
504#[doc = "Functionality for working with distributed tracing HTTP headers"]
505pub mod http {
506 use crate::DatadogSpan;
507 use http::{HeaderMap, HeaderName};
508 use tracing_core::{Dispatch, span::Id};
509
510 #[derive(Copy, Clone, Default)]
513 pub struct DatadogContext {
514 trace_id: u128,
515 parent_id: u64,
516 }
517
518 impl DatadogContext {
519 pub fn from_w3c_headers(headers: &HeaderMap) -> Self {
544 Self::parse_w3c_headers(headers).unwrap_or_default()
545 }
546
547 fn parse_w3c_headers(headers: &HeaderMap) -> Option<Self> {
548 let header = headers.get("traceparent")?.to_str().ok()?;
549
550 let parts: Vec<&str> = header.split('-').collect();
551 if parts.len() != 4 {
552 return None;
553 }
554
555 let Some(0) = u8::from_str_radix(parts[0], 16).ok() else {
556 return None;
557 };
558
559 let trace_id = u128::from_str_radix(parts[1], 16).ok()?;
560 let parent_id = u64::from_str_radix(parts[2], 16).ok()?;
561
562 Some(Self {
563 trace_id,
564 parent_id,
565 })
566 }
567
568 pub fn to_w3c_headers(&self) -> HeaderMap {
584 if self.is_empty() {
585 return Default::default();
586 }
587
588 let header = format!(
589 "{version:02x}-{trace_id:032x}-{parent_id:016x}-{trace_flags:02x}",
590 version = 0,
591 trace_id = self.trace_id,
592 parent_id = self.parent_id,
593 trace_flags = 1,
594 );
595
596 HeaderMap::from_iter([(
597 HeaderName::from_static("traceparent"),
598 header.parse().unwrap(),
599 )])
600 }
601
602 fn is_empty(&self) -> bool {
605 self.trace_id == 0 || self.parent_id == 0
606 }
607 }
608
609 #[derive(Debug)]
612 pub(crate) struct WithContext(
613 #[allow(clippy::type_complexity)]
614 pub(crate) fn(&Dispatch, &Id, f: &mut dyn FnMut(&mut DatadogSpan)),
615 );
616
617 impl WithContext {
618 pub(crate) fn with_context(
619 &self,
620 dispatch: &Dispatch,
621 id: &Id,
622 mut f: &mut dyn FnMut(&mut DatadogSpan),
623 ) {
624 self.0(dispatch, id, &mut f);
625 }
626 }
627
628 pub trait DistributedTracingContext {
629 fn get_context(&self) -> DatadogContext;
631
632 fn set_context(&self, context: DatadogContext);
634 }
635
636 impl DistributedTracingContext for tracing::Span {
637 fn get_context(&self) -> DatadogContext {
638 let mut ctx = None;
639
640 self.with_subscriber(|(id, subscriber)| {
641 let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
642 return;
643 };
644 get_context.with_context(subscriber, id, &mut |dd_span| {
645 ctx = Some(DatadogContext {
646 trace_id: dd_span.trace_id as u128,
648 parent_id: dd_span.parent_id,
649 })
650 });
651 });
652
653 ctx.unwrap_or_default()
654 }
655
656 fn set_context(&self, context: DatadogContext) {
657 if context.is_empty() {
659 return;
660 }
661
662 self.with_subscriber(move |(id, subscriber)| {
663 let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
664 return;
665 };
666 get_context.with_context(subscriber, id, &mut |dd_span| {
667 dd_span.trace_id = context.trace_id as u64;
669 dd_span.parent_id = context.parent_id;
670 })
671 });
672 }
673 }
674
675 #[cfg(test)]
676 mod tests {
677 use super::*;
678 use crate::DatadogTraceLayer;
679 use rand::random_range;
680 use tracing::info_span;
681 use tracing_subscriber::layer::SubscriberExt;
682
683 #[test]
684 fn w3c_trace_header_round_trip() {
685 let context = DatadogContext {
686 trace_id: random_range(1..=u128::MAX),
687 parent_id: random_range(1..=u64::MAX),
688 };
689
690 let headers = context.to_w3c_headers();
691 let parsed = DatadogContext::parse_w3c_headers(&headers).unwrap();
692
693 assert_eq!(context.trace_id, parsed.trace_id);
694 assert_eq!(context.parent_id, parsed.parent_id);
695 }
696
697 #[test]
698 fn empty_context_doesnt_produce_w3c_trace_header() {
699 assert!(DatadogContext::default().to_w3c_headers().is_empty());
700 }
701
702 #[test]
703 fn span_context_round_trip() {
704 tracing::subscriber::with_default(
705 tracing_subscriber::registry().with(
706 DatadogTraceLayer::builder()
707 .service("test-service")
708 .env("test")
709 .version("test-version")
710 .agent_address("localhost:8126")
711 .build()
712 .unwrap(),
713 ),
714 || {
715 let context = DatadogContext {
716 trace_id: random_range(1..=u64::MAX) as u128,
718 parent_id: random_range(1..=u64::MAX),
719 };
720
721 let span = info_span!("test");
722
723 span.set_context(context);
724 let result = span.get_context();
725
726 assert_eq!(context.trace_id, result.trace_id);
727 assert_eq!(context.parent_id, result.parent_id);
728 },
729 );
730 }
731
732 #[test]
733 fn empty_span_context_does_not_erase_ids() {
734 tracing::subscriber::with_default(
735 tracing_subscriber::registry().with(
736 DatadogTraceLayer::builder()
737 .service("test-service")
738 .env("test")
739 .version("test-version")
740 .agent_address("localhost:8126")
741 .build()
742 .unwrap(),
743 ),
744 || {
745 let context = DatadogContext::default();
746
747 let span = info_span!("test");
748
749 span.set_context(context);
750 let result = span.get_context();
751
752 assert_ne!(result.trace_id, 0);
753 assert_eq!(result.parent_id, 0);
754 },
755 );
756 }
757 }
758}
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763
764 #[test]
765 fn builder_builds_successfully() {
766 assert!(
767 DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
768 .service("test-service")
769 .env("test")
770 .version("test-version")
771 .agent_address("localhost:8126")
772 .build()
773 .is_ok()
774 );
775 }
776
777 #[test]
778 fn service_is_required() {
779 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
780 .env("test")
781 .version("test-version")
782 .agent_address("localhost:8126")
783 .build();
784 assert!(result.unwrap_err().to_string().contains("service"));
785 }
786
787 #[test]
788 fn env_is_required() {
789 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
790 .service("test-service")
791 .version("test-version")
792 .agent_address("localhost:8126")
793 .build();
794 assert!(result.unwrap_err().to_string().contains("env"));
795 }
796
797 #[test]
798 fn version_is_required() {
799 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
800 .service("test-service")
801 .env("test")
802 .agent_address("localhost:8126")
803 .build();
804 assert!(result.unwrap_err().to_string().contains("version"));
805 }
806
807 #[test]
808 fn agent_address_is_required() {
809 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
810 .service("test-service")
811 .env("test")
812 .version("test-version")
813 .build();
814 assert!(result.unwrap_err().to_string().contains("agent_address"));
815 }
816
817 #[test]
818 fn default_default_tags_include_env_and_version() {
819 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
820 .service("test-service")
821 .env("test")
822 .version("test-version")
823 .agent_address("localhost:8126")
824 .build()
825 .unwrap();
826 let default_tags = &layer.default_tags;
827 assert_eq!(default_tags["env"], "test");
828 assert_eq!(default_tags["version"], "test-version");
829 }
830
831 #[test]
832 fn default_tags_can_be_added() {
833 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
834 .service("test-service")
835 .env("test")
836 .version("test-version")
837 .agent_address("localhost:8126")
838 .default_tag("foo", "bar")
839 .default_tag("baz", "qux")
840 .build()
841 .unwrap();
842 let default_tags = &layer.default_tags;
843 assert_eq!(default_tags["foo"], "bar");
844 assert_eq!(default_tags["baz"], "qux");
845 }
846}