1#![doc = include_str!("../README.md")]
2
3use jiff::{Timestamp, Zoned};
4use rmp_serde::Serializer as MpSerializer;
5use serde::{Serialize, Serializer};
6use std::{
7 collections::{BTreeMap, HashMap},
8 fmt::{Debug, Display, Formatter, Write},
9 marker::PhantomData,
10 ops::DerefMut,
11 sync::{Arc, Mutex, mpsc},
12 thread::{sleep, spawn},
13 time::{Duration, SystemTime, UNIX_EPOCH},
14};
15use tracing_core::{
16 Event, Field, Level, Subscriber,
17 field::Visit,
18 span::{Attributes, Id, Record},
19};
20use tracing_subscriber::{
21 Layer,
22 layer::Context,
23 registry::{LookupSpan, Scope},
24};
25
26#[derive(Debug)]
44pub struct DatadogTraceLayer<S> {
45 buffer: Arc<Mutex<Vec<DatadogSpan>>>,
46 service: String,
47 default_tags: HashMap<String, String>,
48 logging_enabled: bool,
49 #[cfg(feature = "http")]
50 with_context: http::WithContext,
51 shutdown: mpsc::Sender<()>,
52 _registry: PhantomData<S>,
53}
54
55impl<S> DatadogTraceLayer<S>
56where
57 S: Subscriber + for<'a> LookupSpan<'a>,
58{
59 pub fn builder() -> DatadogTraceLayerBuilder<S> {
61 DatadogTraceLayerBuilder {
62 service: None,
63 default_tags: HashMap::new(),
64 agent_address: None,
65 container_id: None,
66 logging_enabled: false,
67 phantom_data: Default::default(),
68 }
69 }
70
71 #[cfg(feature = "http")]
72 fn get_context(
73 dispatch: &tracing_core::Dispatch,
74 id: &Id,
75 f: &mut dyn FnMut(&mut DatadogSpan),
76 ) {
77 let subscriber = dispatch
78 .downcast_ref::<S>()
79 .expect("Subscriber did not downcast to expected type, this is a bug");
80 let span = subscriber.span(id).expect("Span not found, this is a bug");
81
82 let mut extensions = span.extensions_mut();
83 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
84 f(dd_span);
85 }
86 }
87}
88
89impl<S> Drop for DatadogTraceLayer<S> {
90 fn drop(&mut self) {
91 let _ = self.shutdown.send(());
92 }
93}
94
95impl<S> Layer<S> for DatadogTraceLayer<S>
96where
97 S: Subscriber + for<'a> LookupSpan<'a>,
98{
99 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
100 let span = ctx.span(id).expect("Span not found, this is a bug");
101 let mut extensions = span.extensions_mut();
102
103 let trace_id = span
104 .parent()
105 .map(|parent| {
106 parent
107 .extensions()
108 .get::<DatadogSpan>()
109 .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
110 .trace_id
111 })
112 .unwrap_or(rand::random_range(1..=u64::MAX));
113
114 debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
115
116 let mut dd_span = DatadogSpan {
117 name: span.name().to_string(),
118 service: self.service.clone(),
119 r#type: "internal".into(),
120 span_id: span.id().into_u64(),
121 start: epoch_ns(),
122 parent_id: span
123 .parent()
124 .map(|parent| parent.id().into_u64())
125 .unwrap_or_default(),
126 trace_id,
127 meta: self.default_tags.clone(),
128 ..Default::default()
129 };
130
131 attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
132
133 extensions.insert(dd_span);
134 }
135
136 fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
137 let span = ctx.span(id).expect("Span not found, this is a bug");
138 let mut extensions = span.extensions_mut();
139
140 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
141 values.record(&mut SpanAttributeVisitor::new(dd_span));
142 }
143 }
144
145 fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
146 let span = ctx.span(id).expect("Span not found, this is a bug");
147 let mut extensions = span.extensions_mut();
148
149 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
150 dd_span.parent_id = follows.into_u64();
151 }
152 }
153
154 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
155 if !self.logging_enabled {
156 return;
157 }
158
159 let mut fields = {
160 let mut visitor = FieldVisitor::default();
161 event.record(&mut visitor);
162 visitor.fields
163 };
164
165 let mut message = fields.remove("message").unwrap_or_default();
166
167 fields.extend(
168 ctx.event_scope(event)
169 .into_iter()
170 .flat_map(Scope::from_root)
171 .flat_map(|span| match span.extensions().get::<DatadogSpan>() {
172 Some(dd_span) => dd_span.meta.clone(),
173 None => panic!("DatadogSpan extension not found, this is a bug"),
174 }),
175 );
176
177 fields
178 .into_iter()
179 .try_for_each(|(k, v)| write!(&mut message, " {k}={v}"))
180 .expect("Failed to write log message");
181
182 let (trace_id, span_id) = ctx
183 .lookup_current()
184 .and_then(|span| {
185 span.extensions()
186 .get::<DatadogSpan>()
187 .map(|dd_span| (Some(dd_span.trace_id), Some(dd_span.span_id)))
188 })
189 .unwrap_or_default();
190
191 let log = DatadogLog {
192 timestamp: Zoned::now().timestamp(),
193 level: event.metadata().level().to_owned(),
194 message,
195 trace_id,
196 span_id,
197 };
198
199 let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
200
201 println!("{serialized}");
202 }
203
204 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
205 let span = ctx.span(id).expect("Span not found, this is a bug");
206 let mut extensions = span.extensions_mut();
207
208 let now = epoch_ns();
209
210 match extensions.get_mut::<DatadogSpan>() {
211 Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
212 _ => {}
213 }
214 }
215
216 fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
217 let span = ctx.span(id).expect("Span not found, this is a bug");
218 let mut extensions = span.extensions_mut();
219
220 let now = epoch_ns();
221
222 if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
223 dd_span.duration = now - dd_span.start
224 }
225 }
226
227 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
228 let span = ctx.span(&id).expect("Span not found, this is a bug");
229 let mut extensions = span.extensions_mut();
230
231 if let Some(dd_span) = extensions.remove::<DatadogSpan>() {
232 self.buffer.lock().unwrap().push(dd_span);
233 }
234 }
235
236 #[cfg(feature = "http")]
239 unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
240 match id {
241 id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
242 id if id == std::any::TypeId::of::<http::WithContext>() => {
243 Some(&self.with_context as *const _ as *const ())
244 }
245 _ => None,
246 }
247 }
248}
249
250pub struct DatadogTraceLayerBuilder<S> {
252 service: Option<String>,
253 default_tags: HashMap<String, String>,
254 agent_address: Option<String>,
255 container_id: Option<String>,
256 logging_enabled: bool,
257 phantom_data: PhantomData<S>,
258}
259
260#[derive(Debug)]
262pub struct BuilderError(&'static str);
263
264impl Display for BuilderError {
265 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
266 f.write_str(self.0)
267 }
268}
269
270impl std::error::Error for BuilderError {}
271
272impl<S> DatadogTraceLayerBuilder<S>
273where
274 S: Subscriber + for<'a> LookupSpan<'a>,
275{
276 pub fn service(mut self, service: impl Into<String>) -> Self {
278 self.service = Some(service.into());
279 self
280 }
281
282 pub fn env(mut self, env: impl Into<String>) -> Self {
284 self.default_tags.insert("env".into(), env.into());
285 self
286 }
287
288 pub fn version(mut self, version: impl Into<String>) -> Self {
290 self.default_tags.insert("version".into(), version.into());
291 self
292 }
293
294 pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
296 self.agent_address = Some(agent_address.into());
297 self
298 }
299
300 pub fn default_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
306 let _ = self.default_tags.insert(key.into(), value.into());
307 self
308 }
309
310 pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
312 self.container_id = Some(container_id.into());
313 self
314 }
315
316 pub fn enable_logs(mut self, enable_logs: bool) -> Self {
319 self.logging_enabled = enable_logs;
320 self
321 }
322
323 pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
325 let Some(service) = self.service else {
326 return Err(BuilderError("service is required"));
327 };
328 if !self.default_tags.contains_key("env") {
329 return Err(BuilderError("env is required"));
330 };
331 if !self.default_tags.contains_key("version") {
332 return Err(BuilderError("version is required"));
333 };
334 let Some(agent_address) = self.agent_address else {
335 return Err(BuilderError("agent_address is required"));
336 };
337 let container_id = match self.container_id {
338 Some(s) => Some(
339 s.parse::<reqwest::header::HeaderValue>()
340 .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
341 ),
342 _ => None,
343 };
344
345 let buffer = Arc::new(Mutex::new(Vec::new()));
346 let exporter_buffer = buffer.clone();
347 let url = format!("http://{}/v0.4/traces", agent_address);
348 let (tx, rx) = mpsc::channel();
349
350 spawn(move || {
351 let client = {
352 let mut builder = reqwest::blocking::Client::builder();
353 if let Some(container_id) = container_id {
354 builder = builder.default_headers(reqwest::header::HeaderMap::from_iter([(
355 reqwest::header::HeaderName::from_static("datadog-container-id"),
356 container_id,
357 )]));
358 };
359 builder.build().expect("Failed to build reqwest client")
360 };
361 let mut spans = Vec::new();
362
363 loop {
364 if rx.try_recv().is_ok() {
365 break;
366 }
367
368 sleep(Duration::from_secs(5));
369
370 std::mem::swap(&mut spans, exporter_buffer.lock().unwrap().deref_mut());
371
372 if spans.is_empty() {
373 continue;
374 }
375
376 let mut body = vec![0b10010001];
377 let _ = spans
378 .serialize(&mut MpSerializer::new(&mut body).with_struct_map())
379 .inspect_err(|error| println!("Error serializing spans: {error:?}"));
380
381 spans.clear();
382
383 let _ = client
384 .post(&url)
385 .header("Datadog-Meta-Tracer-Version", "v1.27.0")
386 .header("Content-Type", "application/msgpack")
387 .body(body)
388 .send()
389 .inspect_err(|error| println!("Error exporting spans: {error:?}"));
390 }
391 });
392
393 Ok(DatadogTraceLayer {
394 buffer,
395 service,
396 default_tags: self.default_tags,
397 logging_enabled: self.logging_enabled,
398 #[cfg(feature = "http")]
399 with_context: http::WithContext(DatadogTraceLayer::<S>::get_context),
400 shutdown: tx,
401 _registry: PhantomData,
402 })
403 }
404}
405
406fn epoch_ns() -> i64 {
408 SystemTime::now()
409 .duration_since(UNIX_EPOCH)
410 .expect("SystemTime is before UNIX epoch")
411 .as_nanos() as i64
412}
413
414#[derive(Default, Debug, Serialize)]
416struct DatadogSpan {
417 trace_id: u64,
418 span_id: u64,
419 parent_id: u64,
420 start: i64,
421 duration: i64,
422 name: String,
424 service: String,
425 r#type: String,
426 resource: String,
427 meta: HashMap<String, String>,
428 error_code: i32,
429}
430
431struct SpanAttributeVisitor<'a> {
433 dd_span: &'a mut DatadogSpan,
434}
435
436impl<'a> SpanAttributeVisitor<'a> {
437 fn new(dd_span: &'a mut DatadogSpan) -> Self {
438 Self { dd_span }
439 }
440}
441
442impl<'a> Visit for SpanAttributeVisitor<'a> {
443 fn record_str(&mut self, field: &Field, value: &str) {
444 match field.name() {
446 "service" => self.dd_span.service = value.to_string(),
447 "span.type" => self.dd_span.r#type = value.to_string(),
448 "operation" => self.dd_span.name = value.to_string(),
449 "resource" => self.dd_span.resource = value.to_string(),
450 name => {
451 self.dd_span
452 .meta
453 .insert(name.to_string(), value.to_string());
454 }
455 };
456 }
457
458 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
459 match field.name() {
460 "service" => self.dd_span.service = format!("{value:?}"),
461 "span.type" => self.dd_span.r#type = format!("{value:?}"),
462 "operation" => self.dd_span.name = format!("{value:?}"),
463 "resource" => self.dd_span.resource = format!("{value:?}"),
464 name => {
465 self.dd_span
466 .meta
467 .insert(name.to_string(), format!("{value:?}"));
468 }
469 };
470 }
471}
472
473#[derive(Serialize)]
475struct DatadogLog {
476 timestamp: Timestamp,
477 #[serde(serialize_with = "serialize_level")]
478 level: Level,
479 message: String,
480 #[serde(rename = "dd.trace_id", skip_serializing_if = "Option::is_none")]
481 trace_id: Option<u64>,
482 #[serde(rename = "dd.span_id", skip_serializing_if = "Option::is_none")]
483 span_id: Option<u64>,
484}
485
486fn serialize_level<S: Serializer>(level: &Level, serializer: S) -> Result<S::Ok, S::Error> {
488 serializer.serialize_str(level.as_str())
489}
490
491#[derive(Default)]
493struct FieldVisitor {
494 fields: BTreeMap<String, String>,
495}
496
497impl Visit for FieldVisitor {
498 fn record_str(&mut self, field: &Field, value: &str) {
499 self.fields
500 .insert(field.name().to_string(), value.to_string());
501 }
502
503 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
504 self.fields
505 .insert(field.name().to_string(), format!("{value:?}"));
506 }
507}
508
509#[cfg(feature = "http")]
510#[doc = "Functionality for working with distributed tracing HTTP headers"]
511pub mod http {
512 use crate::DatadogSpan;
513 use http::{HeaderMap, HeaderName};
514 use tracing_core::{Dispatch, span::Id};
515
516 #[derive(Copy, Clone, Default)]
519 pub struct DatadogContext {
520 trace_id: u128,
521 parent_id: u64,
522 }
523
524 impl DatadogContext {
525 pub fn from_w3c_headers(headers: &HeaderMap) -> Self {
550 Self::parse_w3c_headers(headers).unwrap_or_default()
551 }
552
553 fn parse_w3c_headers(headers: &HeaderMap) -> Option<Self> {
554 let header = headers.get("traceparent")?.to_str().ok()?;
555
556 let parts: Vec<&str> = header.split('-').collect();
557 if parts.len() != 4 {
558 return None;
559 }
560
561 let Some(0) = u8::from_str_radix(parts[0], 16).ok() else {
562 return None;
564 };
565
566 let Some(0x01) = u8::from_str_radix(parts[3], 16).ok().map(|n| n & 0x01) else {
567 return None;
569 };
570
571 let trace_id = u128::from_str_radix(parts[1], 16).ok()?;
572 let parent_id = u64::from_str_radix(parts[2], 16).ok()?;
573
574 Some(Self {
575 trace_id,
576 parent_id,
577 })
578 }
579
580 pub fn to_w3c_headers(&self) -> HeaderMap {
596 if self.is_empty() {
597 return Default::default();
598 }
599
600 let header = format!(
601 "{version:02x}-{trace_id:032x}-{parent_id:016x}-{trace_flags:02x}",
602 version = 0,
603 trace_id = self.trace_id,
604 parent_id = self.parent_id,
605 trace_flags = 1,
606 );
607
608 HeaderMap::from_iter([(
609 HeaderName::from_static("traceparent"),
610 header.parse().unwrap(),
611 )])
612 }
613
614 fn is_empty(&self) -> bool {
617 self.trace_id == 0 || self.parent_id == 0
618 }
619 }
620
621 #[derive(Debug)]
624 pub(crate) struct WithContext(
625 #[allow(clippy::type_complexity)]
626 pub(crate) fn(&Dispatch, &Id, f: &mut dyn FnMut(&mut DatadogSpan)),
627 );
628
629 impl WithContext {
630 pub(crate) fn with_context(
631 &self,
632 dispatch: &Dispatch,
633 id: &Id,
634 mut f: &mut dyn FnMut(&mut DatadogSpan),
635 ) {
636 self.0(dispatch, id, &mut f);
637 }
638 }
639
640 pub trait DistributedTracingContext {
641 fn get_context(&self) -> DatadogContext;
643
644 fn set_context(&self, context: DatadogContext);
646 }
647
648 impl DistributedTracingContext for tracing::Span {
649 fn get_context(&self) -> DatadogContext {
650 let mut ctx = None;
651
652 self.with_subscriber(|(id, subscriber)| {
653 let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
654 return;
655 };
656 get_context.with_context(subscriber, id, &mut |dd_span| {
657 ctx = Some(DatadogContext {
658 trace_id: dd_span.trace_id as u128,
660 parent_id: dd_span.parent_id,
661 })
662 });
663 });
664
665 ctx.unwrap_or_default()
666 }
667
668 fn set_context(&self, context: DatadogContext) {
669 if context.is_empty() {
671 return;
672 }
673
674 self.with_subscriber(move |(id, subscriber)| {
675 let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
676 return;
677 };
678 get_context.with_context(subscriber, id, &mut |dd_span| {
679 dd_span.trace_id = context.trace_id as u64;
681 dd_span.parent_id = context.parent_id;
682 })
683 });
684 }
685 }
686
687 #[cfg(test)]
688 mod tests {
689 use super::*;
690 use crate::DatadogTraceLayer;
691 use rand::random_range;
692 use tracing::info_span;
693 use tracing_subscriber::layer::SubscriberExt;
694
695 #[test]
696 fn w3c_trace_header_round_trip() {
697 let context = DatadogContext {
698 trace_id: random_range(1..=u128::MAX),
699 parent_id: random_range(1..=u64::MAX),
700 };
701
702 let headers = context.to_w3c_headers();
703 let parsed = DatadogContext::from_w3c_headers(&headers);
704
705 assert_eq!(context.trace_id, parsed.trace_id);
706 assert_eq!(context.parent_id, parsed.parent_id);
707 }
708
709 #[test]
710 fn empty_context_doesnt_produce_w3c_trace_header() {
711 assert!(DatadogContext::default().to_w3c_headers().is_empty());
712 }
713
714 #[test]
715 fn w3c_trace_header_with_wrong_version_produces_empty_context() {
716 let headers = HeaderMap::from_iter([(
717 HeaderName::from_static("traceparent"),
718 "01-00000000000000000000000000000001-0000000000000001-01"
719 .parse()
720 .unwrap(),
721 )]);
722 let context = DatadogContext::from_w3c_headers(&headers);
723 assert!(context.is_empty());
724 }
725
726 #[test]
727 fn w3c_trace_header_without_sampling_flag_produces_empty_context() {
728 let headers = HeaderMap::from_iter([(
729 HeaderName::from_static("traceparent"),
730 "00-00000000000000000000000000000001-0000000000000001-00"
731 .parse()
732 .unwrap(),
733 )]);
734 let context = DatadogContext::from_w3c_headers(&headers);
735 assert!(context.is_empty());
736 }
737
738 #[test]
739 fn span_context_round_trip() {
740 tracing::subscriber::with_default(
741 tracing_subscriber::registry().with(
742 DatadogTraceLayer::builder()
743 .service("test-service")
744 .env("test")
745 .version("test-version")
746 .agent_address("localhost:8126")
747 .build()
748 .unwrap(),
749 ),
750 || {
751 let context = DatadogContext {
752 trace_id: random_range(1..=u64::MAX) as u128,
754 parent_id: random_range(1..=u64::MAX),
755 };
756
757 let span = info_span!("test");
758
759 span.set_context(context);
760 let result = span.get_context();
761
762 assert_eq!(context.trace_id, result.trace_id);
763 assert_eq!(context.parent_id, result.parent_id);
764 },
765 );
766 }
767
768 #[test]
769 fn empty_span_context_does_not_erase_ids() {
770 tracing::subscriber::with_default(
771 tracing_subscriber::registry().with(
772 DatadogTraceLayer::builder()
773 .service("test-service")
774 .env("test")
775 .version("test-version")
776 .agent_address("localhost:8126")
777 .build()
778 .unwrap(),
779 ),
780 || {
781 let context = DatadogContext::default();
782
783 let span = info_span!("test");
784
785 span.set_context(context);
786 let result = span.get_context();
787
788 assert_ne!(result.trace_id, 0);
789 assert_eq!(result.parent_id, 0);
790 },
791 );
792 }
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799
800 #[test]
801 fn builder_builds_successfully() {
802 assert!(
803 DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
804 .service("test-service")
805 .env("test")
806 .version("test-version")
807 .agent_address("localhost:8126")
808 .build()
809 .is_ok()
810 );
811 }
812
813 #[test]
814 fn service_is_required() {
815 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
816 .env("test")
817 .version("test-version")
818 .agent_address("localhost:8126")
819 .build();
820 assert!(result.unwrap_err().to_string().contains("service"));
821 }
822
823 #[test]
824 fn env_is_required() {
825 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
826 .service("test-service")
827 .version("test-version")
828 .agent_address("localhost:8126")
829 .build();
830 assert!(result.unwrap_err().to_string().contains("env"));
831 }
832
833 #[test]
834 fn version_is_required() {
835 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
836 .service("test-service")
837 .env("test")
838 .agent_address("localhost:8126")
839 .build();
840 assert!(result.unwrap_err().to_string().contains("version"));
841 }
842
843 #[test]
844 fn agent_address_is_required() {
845 let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
846 .service("test-service")
847 .env("test")
848 .version("test-version")
849 .build();
850 assert!(result.unwrap_err().to_string().contains("agent_address"));
851 }
852
853 #[test]
854 fn default_default_tags_include_env_and_version() {
855 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
856 .service("test-service")
857 .env("test")
858 .version("test-version")
859 .agent_address("localhost:8126")
860 .build()
861 .unwrap();
862 let default_tags = &layer.default_tags;
863 assert_eq!(default_tags["env"], "test");
864 assert_eq!(default_tags["version"], "test-version");
865 }
866
867 #[test]
868 fn default_tags_can_be_added() {
869 let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
870 .service("test-service")
871 .env("test")
872 .version("test-version")
873 .agent_address("localhost:8126")
874 .default_tag("foo", "bar")
875 .default_tag("baz", "qux")
876 .build()
877 .unwrap();
878 let default_tags = &layer.default_tags;
879 assert_eq!(default_tags["foo"], "bar");
880 assert_eq!(default_tags["baz"], "qux");
881 }
882}